Merge branch 'websocket_rewrite'
This commit is contained in:
@@ -9,7 +9,6 @@ import (
|
||||
|
||||
"golang.org/x/crypto/acme/autocert"
|
||||
|
||||
"github.com/googollee/go-socket.io"
|
||||
gh "github.com/gorilla/handlers"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/play-with-docker/play-with-docker/config"
|
||||
@@ -21,7 +20,6 @@ import (
|
||||
|
||||
var core pwd.PWDApi
|
||||
var e event.EventApi
|
||||
var ws *socketio.Server
|
||||
|
||||
type HandlerExtender func(h *mux.Router)
|
||||
|
||||
@@ -31,15 +29,6 @@ func Bootstrap(c pwd.PWDApi, ev event.EventApi) {
|
||||
}
|
||||
|
||||
func Register(extend HandlerExtender) {
|
||||
server, err := socketio.NewServer(nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
server.On("connection", WS)
|
||||
server.On("error", WSError)
|
||||
|
||||
RegisterEvents(server)
|
||||
|
||||
r := mux.NewRouter()
|
||||
corsRouter := mux.NewRouter()
|
||||
|
||||
@@ -71,7 +60,7 @@ func Register(extend HandlerExtender) {
|
||||
http.ServeFile(rw, r, "www/sdk.js")
|
||||
})
|
||||
|
||||
corsRouter.Handle("/sessions/{sessionId}/ws/", server)
|
||||
corsRouter.HandleFunc("/sessions/{sessionId}/ws/", WSH)
|
||||
r.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
// Generic routes
|
||||
@@ -129,12 +118,3 @@ func Register(extend HandlerExtender) {
|
||||
log.Fatal(httpServer.ListenAndServe())
|
||||
}
|
||||
}
|
||||
|
||||
func RegisterEvents(s *socketio.Server) {
|
||||
ws = s
|
||||
e.OnAny(broadcastEvent)
|
||||
}
|
||||
|
||||
func broadcastEvent(eventType event.EventType, sessionId string, args ...interface{}) {
|
||||
ws.BroadcastTo(sessionId, eventType.String(), args...)
|
||||
}
|
||||
|
||||
154
handlers/ws.go
154
handlers/ws.go
@@ -1,14 +1,140 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/googollee/go-socket.io"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/twinj/uuid"
|
||||
)
|
||||
|
||||
func WS(so socketio.Socket) {
|
||||
var upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
|
||||
type message struct {
|
||||
Name string `json:"name"`
|
||||
Args []interface{} `json:"args"`
|
||||
}
|
||||
|
||||
type socket struct {
|
||||
c *websocket.Conn
|
||||
mx sync.Mutex
|
||||
listeners map[string][]func(args ...interface{})
|
||||
r *http.Request
|
||||
id string
|
||||
closed bool
|
||||
}
|
||||
|
||||
func newSocket(r *http.Request, c *websocket.Conn) *socket {
|
||||
return &socket{
|
||||
c: c,
|
||||
listeners: map[string][]func(args ...interface{}){},
|
||||
r: r,
|
||||
id: uuid.NewV4().String(),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *socket) Id() string {
|
||||
return s.id
|
||||
}
|
||||
|
||||
func (s *socket) Request() *http.Request {
|
||||
return s.r
|
||||
}
|
||||
|
||||
func (s *socket) Close() {
|
||||
s.closed = true
|
||||
s.onMessage(message{Name: "close"})
|
||||
}
|
||||
|
||||
func (s *socket) process() {
|
||||
defer s.Close()
|
||||
for {
|
||||
mt, m, err := s.c.ReadMessage()
|
||||
if err != nil {
|
||||
log.Printf("Error reading message from websocket. Got: %v\n", err)
|
||||
break
|
||||
}
|
||||
if mt != websocket.TextMessage {
|
||||
log.Printf("Received websocket message, but it is not a text message.\n")
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
var msg message
|
||||
if err := json.Unmarshal(m, &msg); err != nil {
|
||||
log.Printf("Cannot unmarshal message received from websocket. Got: %v\n", err)
|
||||
return
|
||||
}
|
||||
s.onMessage(msg)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *socket) onMessage(msg message) {
|
||||
s.mx.Lock()
|
||||
defer s.mx.Unlock()
|
||||
|
||||
cbs, found := s.listeners[msg.Name]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
for _, cb := range cbs {
|
||||
go cb(msg.Args...)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *socket) Emit(ev string, args ...interface{}) {
|
||||
s.mx.Lock()
|
||||
defer s.mx.Unlock()
|
||||
|
||||
if s.closed {
|
||||
return
|
||||
}
|
||||
|
||||
m := message{Name: ev, Args: args}
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
log.Printf("Cannot marshal event to json. Got: %v\n", err)
|
||||
return
|
||||
}
|
||||
if err := s.c.WriteMessage(websocket.TextMessage, b); err != nil {
|
||||
log.Printf("Cannot write event to websocket connection. Got: %v\n", err)
|
||||
s.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *socket) On(ev string, cb func(args ...interface{})) {
|
||||
s.mx.Lock()
|
||||
defer s.mx.Unlock()
|
||||
listeners, found := s.listeners[ev]
|
||||
if !found {
|
||||
listeners = []func(args ...interface{}){}
|
||||
}
|
||||
listeners = append(listeners, cb)
|
||||
s.listeners[ev] = listeners
|
||||
}
|
||||
|
||||
func WSH(w http.ResponseWriter, r *http.Request) {
|
||||
c, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Print("upgrade:", err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
s := newSocket(r, c)
|
||||
ws(s)
|
||||
s.process()
|
||||
}
|
||||
|
||||
func ws(so *socket) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
fmt.Println("Recovered from ", r)
|
||||
@@ -27,8 +153,6 @@ func WS(so socketio.Socket) {
|
||||
|
||||
client := core.ClientNew(so.Id(), session)
|
||||
|
||||
so.Join(session.Id)
|
||||
|
||||
m, err := NewManager(session)
|
||||
if err != nil {
|
||||
log.Printf("Error creating terminal manager. Got: %v", err)
|
||||
@@ -48,26 +172,32 @@ func WS(so socketio.Socket) {
|
||||
return
|
||||
}
|
||||
|
||||
so.On("session close", func() {
|
||||
so.On("session close", func(args ...interface{}) {
|
||||
m.Close()
|
||||
core.SessionClose(session)
|
||||
})
|
||||
|
||||
so.On("instance terminal in", func(name, data string) {
|
||||
so.On("instance terminal in", func(args ...interface{}) {
|
||||
name := args[0].(string)
|
||||
data := args[1].(string)
|
||||
m.Send(name, []byte(data))
|
||||
})
|
||||
|
||||
so.On("instance viewport resize", func(cols, rows uint) {
|
||||
so.On("instance viewport resize", func(args ...interface{}) {
|
||||
// User resized his viewport
|
||||
core.ClientResizeViewPort(client, cols, rows)
|
||||
cols := args[0].(float64)
|
||||
rows := args[1].(float64)
|
||||
core.ClientResizeViewPort(client, uint(cols), uint(rows))
|
||||
})
|
||||
|
||||
so.On("disconnection", func() {
|
||||
so.On("close", func(args ...interface{}) {
|
||||
m.Close()
|
||||
core.ClientClose(client)
|
||||
})
|
||||
}
|
||||
|
||||
func WSError(so socketio.Socket) {
|
||||
log.Println("error ws")
|
||||
e.OnAny(func(eventType event.EventType, sessionId string, args ...interface{}) {
|
||||
if session.Id == sessionId {
|
||||
so.Emit(eventType.String(), args...)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user