Files
play-with-docker/handlers/ws.go
2017-11-01 14:09:58 -03:00

207 lines
4.1 KiB
Go

package handlers
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/play-with-docker/play-with-docker/event"
"github.com/twinj/uuid"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
type message struct {
Name string `json:"name"`
Args []interface{} `json:"args"`
}
type socket struct {
c *websocket.Conn
mx sync.Mutex
listeners map[string][]func(args ...interface{})
r *http.Request
id string
closed bool
}
func newSocket(r *http.Request, c *websocket.Conn) *socket {
return &socket{
c: c,
listeners: map[string][]func(args ...interface{}){},
r: r,
id: uuid.NewV4().String(),
}
}
func (s *socket) Id() string {
return s.id
}
func (s *socket) Request() *http.Request {
return s.r
}
func (s *socket) Close() {
s.closed = true
s.onMessage(message{Name: "close"})
}
func (s *socket) process() {
defer s.Close()
for {
mt, m, err := s.c.ReadMessage()
if err != nil {
log.Printf("Error reading message from websocket. Got: %v\n", err)
break
}
if mt != websocket.TextMessage {
log.Printf("Received websocket message, but it is not a text message.\n")
continue
}
go func() {
var msg message
if err := json.Unmarshal(m, &msg); err != nil {
log.Printf("Cannot unmarshal message received from websocket. Got: %v\n", err)
return
}
s.onMessage(msg)
}()
}
}
func (s *socket) onMessage(msg message) {
s.mx.Lock()
defer s.mx.Unlock()
cbs, found := s.listeners[msg.Name]
if !found {
return
}
for _, cb := range cbs {
go cb(msg.Args...)
}
}
func (s *socket) Emit(ev string, args ...interface{}) {
s.mx.Lock()
defer s.mx.Unlock()
if s.closed {
return
}
m := message{Name: ev, Args: args}
b, err := json.Marshal(m)
if err != nil {
log.Printf("Cannot marshal event to json. Got: %v\n", err)
return
}
if err := s.c.WriteMessage(websocket.TextMessage, b); err != nil {
log.Printf("Cannot write event to websocket connection. Got: %v\n", err)
s.Close()
return
}
}
func (s *socket) On(ev string, cb func(args ...interface{})) {
s.mx.Lock()
defer s.mx.Unlock()
listeners, found := s.listeners[ev]
if !found {
listeners = []func(args ...interface{}){}
}
listeners = append(listeners, cb)
s.listeners[ev] = listeners
}
func WSH(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer c.Close()
s := newSocket(r, c)
ws(s)
s.process()
}
func ws(so *socket) {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from ", r)
}
}()
vars := mux.Vars(so.Request())
sessionId := vars["sessionId"]
session := core.SessionGet(sessionId)
if session == nil {
log.Printf("Session with id [%s] does not exist!\n", sessionId)
return
}
client := core.ClientNew(so.Id(), session)
m, err := NewManager(session)
if err != nil {
log.Printf("Error creating terminal manager. Got: %v", err)
return
}
go m.Receive(func(name string, data []byte) {
so.Emit("instance terminal out", name, string(data))
})
go m.Status(func(name, status string) {
so.Emit("instance terminal status", name, status)
})
err = m.Start()
if err != nil {
log.Println(err)
return
}
so.On("session close", func(args ...interface{}) {
m.Close()
core.SessionClose(session)
})
so.On("instance terminal in", func(args ...interface{}) {
if len(args) == 2 && args[0] != nil && args[1] != nil {
name := args[0].(string)
data := args[1].(string)
m.Send(name, []byte(data))
}
})
so.On("instance viewport resize", func(args ...interface{}) {
if len(args) == 2 && args[0] != nil && args[1] != nil {
// User resized his viewport
cols := args[0].(float64)
rows := args[1].(float64)
core.ClientResizeViewPort(client, uint(cols), uint(rows))
}
})
so.On("close", func(args ...interface{}) {
m.Close()
core.ClientClose(client)
})
e.OnAny(func(eventType event.EventType, sessionId string, args ...interface{}) {
if session.Id == sessionId {
so.Emit(eventType.String(), args...)
}
})
}