diff --git a/docker/docker.go b/docker/docker.go index 0935fbc..bb586e7 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -51,6 +51,8 @@ type DockerApi interface { Exec(instanceName string, command []string) (int, error) SwarmInit() (*SwarmTokens, error) SwarmJoin(addr, token string) error + ConfigCreate(name string, labels map[string]string, data []byte) error + ConfigDelete(name string) error } type SwarmTokens struct { @@ -62,6 +64,18 @@ type docker struct { c *client.Client } +func (d *docker) ConfigCreate(name string, labels map[string]string, data []byte) error { + config := swarm.ConfigSpec{} + config.Name = name + config.Labels = labels + config.Data = data + _, err := d.c.ConfigCreate(context.Background(), config) + return err +} +func (d *docker) ConfigDelete(name string) error { + return d.c.ConfigRemove(context.Background(), name) +} + func (d *docker) CreateNetwork(id string, opts types.NetworkCreate) error { _, err := d.c.NetworkCreate(context.Background(), id, opts) diff --git a/docker/mock.go b/docker/mock.go index b218573..8d22217 100644 --- a/docker/mock.go +++ b/docker/mock.go @@ -104,6 +104,14 @@ func (m *Mock) SwarmJoin(addr, token string) error { args := m.Called(addr, token) return args.Error(0) } +func (m *Mock) ConfigCreate(name string, labels map[string]string, data []byte) error { + args := m.Called(name, labels, data) + return args.Error(0) +} +func (m *Mock) ConfigDelete(name string) error { + args := m.Called(name) + return args.Error(0) +} type MockConn struct { } diff --git a/handlers/terms.go b/handlers/terms.go new file mode 100644 index 0000000..cb1e9cd --- /dev/null +++ b/handlers/terms.go @@ -0,0 +1,186 @@ +package handlers + +import ( + "log" + "net" + "sync" + + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd/types" + + "golang.org/x/text/encoding" +) + +type terminal struct { + conn net.Conn + write chan []byte + instance *types.Instance +} + +func (t *terminal) Go(ch chan info, ech chan *types.Instance) { + go func() { + for d := range t.write { + _, err := t.conn.Write(d) + if err != nil { + ech <- t.instance + return + } + } + }() + go func() { + encoder := encoding.Replacement.NewEncoder() + buf := make([]byte, 1024) + for { + n, err := t.conn.Read(buf) + if err != nil { + ech <- t.instance + return + } + b, err := encoder.Bytes(buf[:n]) + if err != nil { + ech <- t.instance + return + } + ch <- info{name: t.instance.Name, data: b} + } + }() +} + +type info struct { + name string + data []byte +} +type manager struct { + sendCh chan info + receiveCh chan info + terminals map[string]terminal + errorCh chan *types.Instance + instances map[string]*types.Instance + sync.Mutex +} + +func (m *manager) Send(name string, data []byte) { + m.sendCh <- info{name: name, data: data} +} +func (m *manager) Receive(cb func(name string, data []byte)) { + for i := range m.receiveCh { + cb(i.name, i.data) + } +} +func (m *manager) connect(instance *types.Instance) error { + if !m.trackingInstance(instance) { + return nil + } + + conn, err := core.InstanceGetTerminal(instance) + if err != nil { + return err + } + chw := make(chan []byte, 10) + t := terminal{conn: conn, write: chw, instance: instance} + m.terminals[instance.Name] = t + t.Go(m.receiveCh, m.errorCh) + return nil +} + +func (m *manager) trackInstance(instance *types.Instance) { + m.Lock() + defer m.Unlock() + + m.instances[instance.Name] = instance + +} +func (m *manager) untrackInstance(instance *types.Instance) { + m.Lock() + defer m.Unlock() + + delete(m.instances, instance.Name) +} +func (m *manager) trackingInstance(instance *types.Instance) bool { + m.Lock() + defer m.Unlock() + _, found := m.instances[instance.Name] + + return found +} + +func (m *manager) disconnect(instance *types.Instance) { + if !m.trackingInstance(instance) { + return + } + + t := m.terminals[instance.Name] + close(t.write) + t.conn.Close() + m.untrackInstance(instance) +} + +func (m *manager) process() { + for { + select { + case i := <-m.sendCh: + t := m.terminals[i.name] + t.write <- i.data + case instance := <-m.errorCh: + log.Println("reconnecting") + m.connect(instance) + } + } +} +func (m *manager) Close() { + for _, i := range m.instances { + m.disconnect(i) + } +} + +func NewManager(s *types.Session) (*manager, error) { + m := &manager{ + sendCh: make(chan info), + receiveCh: make(chan info), + terminals: make(map[string]terminal), + errorCh: make(chan *types.Instance), + instances: make(map[string]*types.Instance), + } + + instances, err := core.InstanceFindBySession(s) + if err != nil { + return nil, err + } + for _, i := range instances { + m.instances[i.Name] = i + m.connect(i) + } + e.On(event.INSTANCE_NEW, func(sessionId string, args ...interface{}) { + if sessionId != s.Id { + return + } + + // There is a new instance in a session we are tracking. We should track it's terminal + instanceName := args[0].(string) + instance := core.InstanceGet(s, instanceName) + if instance == nil { + log.Printf("Instance [%s] was not found in session [%s]\n", instanceName, sessionId) + return + } + m.trackInstance(instance) + m.connect(instance) + }) + + e.On(event.INSTANCE_DELETE, func(sessionId string, args ...interface{}) { + if sessionId != s.Id { + return + } + + // There is a new instance in a session we are tracking. We should track it's terminal + instanceName := args[0].(string) + instance := core.InstanceGet(s, instanceName) + if instance == nil { + log.Printf("Instance [%s] was not found in session [%s]\n", instanceName, sessionId) + return + } + m.disconnect(instance) + }) + + go m.process() + return m, nil +} diff --git a/handlers/ws.go b/handlers/ws.go index e91b110..b9ee774 100644 --- a/handlers/ws.go +++ b/handlers/ws.go @@ -3,15 +3,9 @@ package handlers import ( "fmt" "log" - "net" - "sync" - - "golang.org/x/text/encoding" "github.com/googollee/go-socket.io" "github.com/gorilla/mux" - "github.com/play-with-docker/play-with-docker/event" - "github.com/play-with-docker/play-with-docker/pwd/types" ) func WS(so socketio.Socket) { @@ -30,88 +24,27 @@ func WS(so socketio.Socket) { return } - so.Join(session.Id) - - instances, err := core.InstanceFindBySession(session) - if err != nil { - log.Printf("Couldn't find instances for session with id [%s]. Got: %v\n", sessionId, err) - return - } - var rw sync.Mutex - trackedTerminals := make(map[string]net.Conn, len(instances)) - - attachTerminalToSocket := func(instance *types.Instance, ws socketio.Socket) { - rw.Lock() - defer rw.Unlock() - if _, found := trackedTerminals[instance.Name]; found { - return - } - conn, err := core.InstanceGetTerminal(instance) - if err != nil { - log.Println(err) - return - } - trackedTerminals[instance.Name] = conn - - go func(instanceName string, c net.Conn, ws socketio.Socket) { - defer c.Close() - defer func() { - rw.Lock() - defer rw.Unlock() - delete(trackedTerminals, instanceName) - }() - encoder := encoding.Replacement.NewEncoder() - buf := make([]byte, 1024) - for { - n, err := c.Read(buf) - if err != nil { - log.Println(err) - return - } - b, err := encoder.Bytes(buf[:n]) - if err != nil { - log.Println(err) - return - } - ws.Emit("instance terminal out", instanceName, string(b)) - } - }(instance.Name, conn, ws) - } - // since this is a new connection, get all terminals of the session and attach - for _, instance := range instances { - attachTerminalToSocket(instance, so) - } - - e.On(event.INSTANCE_NEW, func(sessionId string, args ...interface{}) { - if sessionId != session.Id { - return - } - - // There is a new instance in a session we are tracking. We should track it's terminal - instanceName := args[0].(string) - instance := core.InstanceGet(session, instanceName) - if instance == nil { - log.Printf("Instance [%s] was not found in session [%s]\n", instanceName, sessionId) - return - } - attachTerminalToSocket(instance, so) - }) - client := core.ClientNew(so.Id(), session) + so.Join(session.Id) + + m, err := NewManager(session) + if err != nil { + log.Printf("Error creating terminal manager. Got: %v", err) + return + } + + go m.Receive(func(name string, data []byte) { + so.Emit("instance terminal out", name, string(data)) + }) + so.On("session close", func() { + m.Close() core.SessionClose(session) }) so.On("instance terminal in", func(name, data string) { - rw.Lock() - defer rw.Unlock() - conn, found := trackedTerminals[name] - if !found { - log.Printf("Could not find instance [%s] in session [%s]\n", name, sessionId) - return - } - go conn.Write([]byte(data)) + m.Send(name, []byte(data)) }) so.On("instance viewport resize", func(cols, rows uint) { @@ -120,6 +53,7 @@ func WS(so socketio.Socket) { }) so.On("disconnection", func() { + m.Close() core.ClientClose(client) }) } diff --git a/provisioner/windows.go b/provisioner/windows.go index 74ddb21..06b79e3 100644 --- a/provisioner/windows.go +++ b/provisioner/windows.go @@ -6,14 +6,16 @@ import ( "io" "log" "net" + "net/http" + "net/url" "sort" - "strings" + + "golang.org/x/net/websocket" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/aws/aws-sdk-go/service/ec2" - "github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/play-with-docker/play-with-docker/router" @@ -48,60 +50,31 @@ func NewWindowsASG(f docker.FactoryApi, st storage.StorageApi) *windows { } func (d *windows) InstanceNew(session *types.Session, conf types.InstanceConfig) (*types.Instance, error) { - - conf.ImageName = config.GetSSHImage() - winfo, err := d.getWindowsInstanceInfo(session.Id) if err != nil { return nil, err } - if conf.Hostname == "" { - instances, err := d.storage.InstanceFindBySessionId(session.Id) - if err != nil { - return nil, err - } - var nodeName string - for i := 1; ; i++ { - nodeName = fmt.Sprintf("node%d", i) - exists := checkHostnameExists(session.Id, nodeName, instances) - if !exists { - break - } - } - conf.Hostname = nodeName - } - - containerName := fmt.Sprintf("%s_%s", session.Id[:8], conf.Hostname) - opts := docker.CreateContainerOpts{ - Image: conf.ImageName, - WindowsEndpoint: winfo.privateIP, - SessionId: session.Id, - PwdIpAddress: session.PwdIpAddress, - ContainerName: containerName, - Hostname: conf.Hostname, - ServerCert: conf.ServerCert, - ServerKey: conf.ServerKey, - CACert: conf.CACert, - Privileged: false, - HostFQDN: conf.Host, - Networks: []string{session.Id}, + labels := map[string]string{ + "io.tutorius.networkid": session.Id, + "io.tutorius.networking.remote.ip": winfo.privateIP, } + instanceName := fmt.Sprintf("%s_%s", session.Id[:8], winfo.id) dockerClient, err := d.factory.GetForSession(session.Id) if err != nil { d.releaseInstance(winfo.id) return nil, err } - if err = dockerClient.CreateContainer(opts); err != nil { + if err = dockerClient.ConfigCreate(instanceName, labels, []byte(instanceName)); err != nil { d.releaseInstance(winfo.id) return nil, err } instance := &types.Instance{} - instance.Name = containerName - instance.Image = opts.Image + instance.Name = instanceName + instance.Image = "" instance.IP = winfo.privateIP instance.RoutableIP = instance.IP instance.SessionId = session.Id @@ -115,31 +88,6 @@ func (d *windows) InstanceNew(session *types.Session, conf types.InstanceConfig) instance.ProxyHost = router.EncodeHost(session.Id, instance.RoutableIP, router.HostOpts{}) instance.SessionHost = session.Host - if cli, err := d.factory.GetForInstance(instance); err != nil { - if derr := d.InstanceDelete(session, instance); derr != nil { - log.Println("Error deleting instance: ", derr) - } - return nil, err - } else { - info, err := cli.GetDaemonInfo() - if err != nil { - if derr := d.InstanceDelete(session, instance); derr != nil { - log.Println("Error deleting instance: ", derr) - } - return nil, err - } - instance.Hostname = info.Name - instance.Name = fmt.Sprintf("%s_%s", session.Id[:8], info.Name) - if err = dockerClient.ContainerRename(containerName, instance.Name); err != nil { - // revert instance name to remove ssh container - instance.Name = containerName - if derr := d.InstanceDelete(session, instance); derr != nil { - log.Println("Error deleting instance: ", derr) - } - return nil, err - } - } - return instance, nil } @@ -160,13 +108,13 @@ func (d *windows) InstanceDelete(session *types.Session, instance *types.Instanc return err } - // return error and don't do anything else + //return error and don't do anything else if _, err := ec2Service.TerminateInstances(&ec2.TerminateInstancesInput{InstanceIds: []*string{aws.String(instance.WindowsId)}}); err != nil { return err } - err = dockerClient.DeleteContainer(instance.Name) - if err != nil && !strings.Contains(err.Error(), "No such container") { + err = dockerClient.ConfigDelete(instance.Name) + if err != nil { return err } @@ -178,27 +126,68 @@ func (d *windows) releaseInstance(instanceId string) error { } func (d *windows) InstanceResizeTerminal(instance *types.Instance, rows, cols uint) error { - dockerClient, err := d.factory.GetForSession(instance.SessionId) + resp, err := http.Post(fmt.Sprintf("http://%s:222/terminals/1/size?cols=%d&rows=%d", instance.IP, cols, rows), "application/json", nil) if err != nil { + log.Println(err) return err } - return dockerClient.ContainerResize(instance.Name, rows, cols) + if resp.StatusCode != 200 { + log.Printf("Error resizing terminal of instance %s. Got %d\n", instance.Name, resp.StatusCode) + return fmt.Errorf("Error resizing terminal got %d\n", resp.StatusCode) + } + return nil } func (d *windows) InstanceGetTerminal(instance *types.Instance) (net.Conn, error) { - dockerClient, err := d.factory.GetForSession(instance.SessionId) + resp, err := http.Post(fmt.Sprintf("http://%s:222/terminals/1", instance.IP), "application/json", nil) if err != nil { + log.Printf("Error creating terminal for instance %s. Got %v\n", instance.Name, err) return nil, err } - return dockerClient.CreateAttachConnection(instance.Name) + if resp.StatusCode != 200 { + log.Printf("Error creating terminal for instance %s. Got %d\n", instance.Name, resp.StatusCode) + return nil, fmt.Errorf("Creating terminal got %d\n", resp.StatusCode) + } + url := fmt.Sprintf("ws://%s:222/terminals/1", instance.IP) + ws, err := websocket.Dial(url, "", url) + if err != nil { + log.Println(err) + return nil, err + } + return ws, nil } -func (d *windows) InstanceUploadFromUrl(instance *types.Instance, fileName, dest, url string) error { - return fmt.Errorf("Not implemented") +func (d *windows) InstanceUploadFromUrl(instance *types.Instance, fileName, dest, u string) error { + log.Printf("Downloading file [%s]\n", u) + resp, err := http.Get(u) + if err != nil { + return fmt.Errorf("Could not download file [%s]. Error: %s\n", u, err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("Could not download file [%s]. Status code: %d\n", u, resp.StatusCode) + } + uploadResp, err := http.Post(fmt.Sprintf("http://%s:222/terminals/1/uploads?dest=%s&file_name=%s", instance.IP, url.QueryEscape(dest), url.QueryEscape(fileName)), "", resp.Body) + if err != nil { + return err + } + if uploadResp.StatusCode != 200 { + return fmt.Errorf("Could not upload file [%s]. Status code: %d\n", fileName, uploadResp.StatusCode) + } + + return nil } func (d *windows) InstanceUploadFromReader(instance *types.Instance, fileName, dest string, reader io.Reader) error { - return fmt.Errorf("Not implemented") + uploadResp, err := http.Post(fmt.Sprintf("http://%s:222/terminals/1/uploads?dest=%s&file_name=%s", instance.IP, url.QueryEscape(dest), url.QueryEscape(fileName)), "", reader) + if err != nil { + return err + } + if uploadResp.StatusCode != 200 { + return fmt.Errorf("Could not upload file [%s]. Status code: %d\n", fileName, uploadResp.StatusCode) + } + + return nil } func (d *windows) getWindowsInstanceInfo(sessionId string) (*instanceInfo, error) { diff --git a/scheduler/task/collect_stats.go b/scheduler/task/collect_stats.go index a9c6fb8..50eb16d 100644 --- a/scheduler/task/collect_stats.go +++ b/scheduler/task/collect_stats.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "net/http" dockerTypes "github.com/docker/docker/api/types" units "github.com/docker/go-units" @@ -35,6 +36,29 @@ func (t *collectStats) Name() string { } func (t *collectStats) Run(ctx context.Context, instance *types.Instance) error { + if instance.Type == "windows" { + resp, err := http.Get(fmt.Sprintf("http://%s:222/stats", instance.IP)) + if err != nil { + log.Printf("Could not get stats of windows instance with IP %s. Got: %v\n", instance.IP, err) + return fmt.Errorf("Could not get stats of windows instance with IP %s. Got: %v\n", instance.IP, err) + } + if resp.StatusCode != 200 { + log.Printf("Could not get stats of windows instance with IP %s. Got status code: %d\n", instance.IP, resp.StatusCode) + return fmt.Errorf("Could not get stats of windows instance with IP %s. Got status code: %d\n", instance.IP, resp.StatusCode) + } + var info map[string]float64 + err = json.NewDecoder(resp.Body).Decode(&info) + if err != nil { + log.Printf("Could not get stats of windows instance with IP %s. Got: %v\n", instance.IP, err) + return fmt.Errorf("Could not get stats of windows instance with IP %s. Got: %v\n", instance.IP, err) + } + stats := InstanceStats{Instance: instance.Name} + + stats.Mem = fmt.Sprintf("%.2f%% (%s / %s)", ((info["mem_used"] / info["mem_total"]) * 100), units.BytesSize(info["mem_used"]), units.BytesSize(info["mem_total"])) + stats.Cpu = fmt.Sprintf("%.2f%%", info["cpu"]*100) + t.event.Emit(CollectStatsEvent, instance.SessionId, stats) + return nil + } dockerClient, err := t.factory.GetForSession(instance.SessionId) if err != nil { log.Println(err)