From b1406f4ea808d6c8609faae5a9b90d30fffef7cb Mon Sep 17 00:00:00 2001 From: "Jonathan Leibiusky (@xetorthio)" Date: Thu, 19 Oct 2017 13:49:14 +0200 Subject: [PATCH] First try on using standard websocket and removing socket.io from the middle --- handlers/bootstrap.go | 22 +------ handlers/ws.go | 137 +++++++++++++++++++++++++++++++++++++++--- www/assets/app.js | 48 ++++++++++++++- www/index.html | 1 - 4 files changed, 177 insertions(+), 31 deletions(-) diff --git a/handlers/bootstrap.go b/handlers/bootstrap.go index 2438e1d..a183263 100644 --- a/handlers/bootstrap.go +++ b/handlers/bootstrap.go @@ -9,7 +9,6 @@ import ( "golang.org/x/crypto/acme/autocert" - "github.com/googollee/go-socket.io" gh "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/play-with-docker/play-with-docker/config" @@ -21,7 +20,6 @@ import ( var core pwd.PWDApi var e event.EventApi -var ws *socketio.Server type HandlerExtender func(h *mux.Router) @@ -31,15 +29,6 @@ func Bootstrap(c pwd.PWDApi, ev event.EventApi) { } func Register(extend HandlerExtender) { - server, err := socketio.NewServer(nil) - if err != nil { - log.Fatal(err) - } - server.On("connection", WS) - server.On("error", WSError) - - RegisterEvents(server) - r := mux.NewRouter() corsRouter := mux.NewRouter() @@ -71,7 +60,7 @@ func Register(extend HandlerExtender) { http.ServeFile(rw, r, "www/sdk.js") }) - corsRouter.Handle("/sessions/{sessionId}/ws/", server) + corsRouter.HandleFunc("/sessions/{sessionId}/ws/", WSH) r.Handle("/metrics", promhttp.Handler()) // Generic routes @@ -129,12 +118,3 @@ func Register(extend HandlerExtender) { } } - -func RegisterEvents(s *socketio.Server) { - ws = s - e.OnAny(broadcastEvent) -} - -func broadcastEvent(eventType event.EventType, sessionId string, args ...interface{}) { - ws.BroadcastTo(sessionId, eventType.String(), args...) -} diff --git a/handlers/ws.go b/handlers/ws.go index b41b490..f291070 100644 --- a/handlers/ws.go +++ b/handlers/ws.go @@ -1,14 +1,127 @@ package handlers import ( + "encoding/json" "fmt" "log" + "net/http" + "sync" "github.com/googollee/go-socket.io" "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "github.com/play-with-docker/play-with-docker/event" + "github.com/twinj/uuid" ) -func WS(so socketio.Socket) { +var upgrader = websocket.Upgrader{} + +type message struct { + Name string `json:"name"` + Args []interface{} `json:"args"` +} + +type socket struct { + c *websocket.Conn + mx sync.Mutex + listeners map[string][]func(args ...interface{}) + r *http.Request + id string +} + +func newSocket(r *http.Request, c *websocket.Conn) *socket { + return &socket{ + c: c, + listeners: map[string][]func(args ...interface{}){}, + r: r, + id: uuid.NewV4().String(), + } +} + +func (s *socket) Id() string { + return s.id +} + +func (s *socket) Request() *http.Request { + return s.r +} + +func (s *socket) process() { + defer s.onMessage(message{Name: "close"}) + for { + mt, m, err := s.c.ReadMessage() + if err != nil { + log.Printf("Error reading message from websocket. Got: %v\n", err) + break + } + if mt != websocket.TextMessage { + log.Printf("Received websocket message, but it is not a text message.\n") + continue + } + go func() { + var msg message + if err := json.Unmarshal(m, &msg); err != nil { + log.Printf("Cannot unmarshal message received from websocket. Got: %v\n", err) + return + } + s.onMessage(msg) + }() + } +} + +func (s *socket) onMessage(msg message) { + s.mx.Lock() + defer s.mx.Unlock() + + cbs, found := s.listeners[msg.Name] + if !found { + return + } + for _, cb := range cbs { + go cb(msg.Args...) + } +} + +func (s *socket) Emit(ev string, args ...interface{}) { + s.mx.Lock() + defer s.mx.Unlock() + m := message{Name: ev, Args: args} + b, err := json.Marshal(m) + if err != nil { + log.Printf("Cannot marshal event to json. Got: %v\n", err) + return + } + if err := s.c.WriteMessage(websocket.TextMessage, b); err != nil { + log.Printf("Cannot write event to websocket connection. Got: %v\n", err) + return + } +} + +func (s *socket) On(ev string, cb func(args ...interface{})) { + s.mx.Lock() + defer s.mx.Unlock() + listeners, found := s.listeners[ev] + if !found { + listeners = []func(args ...interface{}){} + } + listeners = append(listeners, cb) + s.listeners[ev] = listeners +} + +func WSH(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Print("upgrade:", err) + return + } + defer c.Close() + + s := newSocket(r, c) + ws(s) + s.process() +} + +func ws(so *socket) { defer func() { if r := recover(); r != nil { fmt.Println("Recovered from ", r) @@ -26,8 +139,6 @@ func WS(so socketio.Socket) { client := core.ClientNew(so.Id(), session) - so.Join(session.Id) - m, err := NewManager(session) if err != nil { log.Printf("Error creating terminal manager. Got: %v", err) @@ -47,24 +158,34 @@ func WS(so socketio.Socket) { return } - so.On("session close", func() { + so.On("session close", func(args ...interface{}) { m.Close() core.SessionClose(session) }) - so.On("instance terminal in", func(name, data string) { + so.On("instance terminal in", func(args ...interface{}) { + name := args[0].(string) + data := args[1].(string) m.Send(name, []byte(data)) }) - so.On("instance viewport resize", func(cols, rows uint) { + so.On("instance viewport resize", func(args ...interface{}) { // User resized his viewport - core.ClientResizeViewPort(client, cols, rows) + cols := args[0].(float64) + rows := args[1].(float64) + core.ClientResizeViewPort(client, uint(cols), uint(rows)) }) - so.On("disconnection", func() { + so.On("close", func(args ...interface{}) { m.Close() core.ClientClose(client) }) + + e.OnAny(func(eventType event.EventType, sessionId string, args ...interface{}) { + if session.Id == sessionId { + so.Emit(eventType.String(), args...) + } + }) } func WSError(so socketio.Socket) { diff --git a/www/assets/app.js b/www/assets/app.js index ad1f584..d5a583c 100644 --- a/www/assets/app.js +++ b/www/assets/app.js @@ -179,7 +179,53 @@ $scope.idxByHostname[instance.hostname] = instance; } - var socket = io({ path: '/sessions/' + sessionId + '/ws' }); + var base = ''; + if (window.location.protocol == 'http:') { + base = 'ws://'; + } else { + base = 'wss://'; + } + base += window.location.host; + if (window.location.port) { + base += ':' + window.location.port; + } + + var socket = new WebSocket(base + '/sessions/' + sessionId + '/ws/'); + socket.listeners = {}; + + socket.on = function(name, cb) { + if (!socket.listeners[name]) { + socket.listeners[name] = []; + } + socket.listeners[name].push(cb); + } + + socket.emit = function() { + var name = arguments[0] + var args = []; + for (var i = 1; i < arguments.length; i++) { + args.push(arguments[i]); + } + socket.send(JSON.stringify({name: name, args: args})); + } + + socket.addEventListener('open', function (event) { + console.log('open', event); + }); + socket.addEventListener('close', function (event) { + console.log('close', event); + }); + socket.addEventListener('message', function (event) { + var m = JSON.parse(event.data); + var ls = socket.listeners[m.name]; + if (ls) { + for (var i=0; i -