diff --git a/docker/docker.go b/docker/docker.go index 08876c7..054874e 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -4,27 +4,23 @@ import ( "archive/tar" "bytes" "context" - "crypto/tls" "fmt" "io" "io/ioutil" "log" "net" - "net/http" "os" "strconv" "strings" "time" "github.com/docker/distribution/reference" - "github.com/docker/docker/api" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/client" "github.com/docker/docker/pkg/jsonmessage" - "github.com/docker/go-connections/tlsconfig" ) const ( @@ -49,7 +45,6 @@ type DockerApi interface { DisconnectNetwork(containerId, networkId string) error DeleteNetwork(id string) error Exec(instanceName string, command []string) (int, error) - New(ip string, cert, key []byte) (DockerApi, error) SwarmInit() (*SwarmTokens, error) SwarmJoin(addr, token string) error } @@ -411,6 +406,7 @@ func (d *docker) DeleteNetwork(id string) error { return nil } +/* func (d *docker) New(ip string, cert, key []byte) (DockerApi, error) { // We check if the client needs to use TLS var tlsConfig *tls.Config @@ -454,7 +450,7 @@ func (d *docker) New(ip string, cert, key []byte) (DockerApi, error) { } return NewDocker(c), nil } - +*/ func (d *docker) SwarmInit() (*SwarmTokens, error) { req := swarm.InitRequest{AdvertiseAddr: "eth0", ListenAddr: "0.0.0.0:2377"} _, err := d.c.SwarmInit(context.Background(), req) diff --git a/docker/factory.go b/docker/factory.go new file mode 100644 index 0000000..4fc6ef8 --- /dev/null +++ b/docker/factory.go @@ -0,0 +1,6 @@ +package docker + +type FactoryApi interface { + GetForSession(sessionId string) (DockerApi, error) + GetForInstance(sessionId, instanceName string) (DockerApi, error) +} diff --git a/docker/factory_mock.go b/docker/factory_mock.go new file mode 100644 index 0000000..d44987c --- /dev/null +++ b/docker/factory_mock.go @@ -0,0 +1,17 @@ +package docker + +import "github.com/stretchr/testify/mock" + +type FactoryMock struct { + mock.Mock +} + +func (m *FactoryMock) GetForSession(sessionId string) (DockerApi, error) { + args := m.Called(sessionId) + return args.Get(0).(DockerApi), args.Error(1) +} + +func (m *FactoryMock) GetForInstance(sessionId, instanceName string) (DockerApi, error) { + args := m.Called(sessionId, instanceName) + return args.Get(0).(DockerApi), args.Error(1) +} diff --git a/docker/local_cached_factory.go b/docker/local_cached_factory.go new file mode 100644 index 0000000..2b54612 --- /dev/null +++ b/docker/local_cached_factory.go @@ -0,0 +1,120 @@ +package docker + +import ( + "context" + "crypto/tls" + "fmt" + "log" + "net" + "net/http" + "sync" + "time" + + "github.com/docker/docker/api" + "github.com/docker/docker/client" + "github.com/docker/go-connections/tlsconfig" + "github.com/play-with-docker/play-with-docker/router" + "github.com/play-with-docker/play-with-docker/storage" +) + +type localCachedFactory struct { + rw sync.Mutex + sessionClient DockerApi + instanceClients map[string]DockerApi + storage storage.StorageApi +} + +func (f *localCachedFactory) GetForSession(sessionId string) (DockerApi, error) { + f.rw.Lock() + defer f.rw.Unlock() + + if f.sessionClient != nil { + return f.sessionClient, nil + } + + c, err := client.NewEnvClient() + if err != nil { + return nil, err + } + err = f.check(c) + if err != nil { + return nil, err + } + d := NewDocker(c) + f.sessionClient = d + return f.sessionClient, nil +} + +func (f *localCachedFactory) GetForInstance(sessionId, instanceName string) (DockerApi, error) { + f.rw.Lock() + defer f.rw.Unlock() + + c, found := f.instanceClients[sessionId+instanceName] + if found { + return c, nil + } + + instance, err := f.storage.InstanceFind(sessionId, instanceName) + if err != nil { + return nil, err + } + // Need to create client to the DinD docker daemon + // We check if the client needs to use TLS + var tlsConfig *tls.Config + if len(instance.Cert) > 0 && len(instance.Key) > 0 { + tlsConfig = tlsconfig.ClientDefault() + tlsConfig.InsecureSkipVerify = true + tlsCert, err := tls.X509KeyPair(instance.Cert, instance.Key) + if err != nil { + return nil, fmt.Errorf("Could not load X509 key pair: %v. Make sure the key is not encrypted", err) + } + tlsConfig.Certificates = []tls.Certificate{tlsCert} + } + + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 1 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext} + if tlsConfig != nil { + transport.TLSClientConfig = tlsConfig + } + cli := &http.Client{ + Transport: transport, + } + dc, err := client.NewClient(fmt.Sprintf("http://%s:443", instance.Session.Host), api.DefaultVersion, cli, map[string]string{"Host": router.EncodeHost(instance.SessionId, instance.IP, router.HostOpts{EncodedPort: 2375})}) + if err != nil { + return nil, fmt.Errorf("Could not connect to DinD docker daemon", err) + } + err = f.check(dc) + if err != nil { + return nil, err + } + f.instanceClients[sessionId+instance.Name] = NewDocker(dc) + + return f.instanceClients[instance.Name], nil +} + +func (f *localCachedFactory) check(c *client.Client) error { + for i := 0; i < 5; i++ { + _, err := c.Ping(context.Background()) + if err != nil { + if client.IsErrConnectionFailed(err) { + // connection has failed, maybe instance is not ready yet, sleep and retry + log.Printf("Connection to [%s] has failed, maybe instance is not ready yet, sleeping and retrying in 1 second. Try #%d\n", c.DaemonHost(), i+1) + time.Sleep(time.Second) + continue + } + return err + } + break + } + return nil +} + +func NewLocalCachedFactory(s storage.StorageApi) *localCachedFactory { + return &localCachedFactory{ + instanceClients: make(map[string]DockerApi), + storage: s, + } +} diff --git a/docker/mock.go b/docker/mock.go new file mode 100644 index 0000000..3304cf8 --- /dev/null +++ b/docker/mock.go @@ -0,0 +1,122 @@ +package docker + +import ( + "io" + "net" + "time" + + "github.com/docker/docker/api/types" + "github.com/stretchr/testify/mock" +) + +type Mock struct { + mock.Mock +} + +func (m *Mock) CreateNetwork(id string) error { + args := m.Called(id) + return args.Error(0) +} + +func (m *Mock) ConnectNetwork(container, network, ip string) (string, error) { + args := m.Called(container, network, ip) + return args.String(0), args.Error(1) +} + +func (m *Mock) GetDaemonInfo() (types.Info, error) { + args := m.Called() + return args.Get(0).(types.Info), args.Error(1) +} + +func (m *Mock) GetSwarmPorts() ([]string, []uint16, error) { + args := m.Called() + return args.Get(0).([]string), args.Get(1).([]uint16), args.Error(2) +} + +func (m *Mock) GetPorts() ([]uint16, error) { + args := m.Called() + return args.Get(0).([]uint16), args.Error(1) +} +func (m *Mock) GetContainerStats(name string) (io.ReadCloser, error) { + args := m.Called(name) + return args.Get(0).(io.ReadCloser), args.Error(1) +} +func (m *Mock) ContainerResize(name string, rows, cols uint) error { + args := m.Called(name, rows, cols) + return args.Error(0) +} +func (m *Mock) CreateAttachConnection(name string) (net.Conn, error) { + args := m.Called(name) + return args.Get(0).(net.Conn), args.Error(1) +} +func (m *Mock) CopyToContainer(containerName, destination, fileName string, content io.Reader) error { + args := m.Called(containerName, destination, fileName, content) + return args.Error(0) +} +func (m *Mock) DeleteContainer(id string) error { + args := m.Called(id) + return args.Error(0) +} +func (m *Mock) CreateContainer(opts CreateContainerOpts) (string, error) { + args := m.Called(opts) + return args.String(0), args.Error(1) +} +func (m *Mock) ExecAttach(instanceName string, command []string, out io.Writer) (int, error) { + args := m.Called(instanceName, command, out) + return args.Int(0), args.Error(1) +} +func (m *Mock) DisconnectNetwork(containerId, networkId string) error { + args := m.Called(containerId, networkId) + return args.Error(0) +} +func (m *Mock) DeleteNetwork(id string) error { + args := m.Called(id) + return args.Error(0) +} +func (m *Mock) Exec(instanceName string, command []string) (int, error) { + args := m.Called(instanceName, command) + return args.Int(0), args.Error(1) +} +func (m *Mock) SwarmInit() (*SwarmTokens, error) { + args := m.Called() + return args.Get(0).(*SwarmTokens), args.Error(1) +} +func (m *Mock) SwarmJoin(addr, token string) error { + args := m.Called(addr, token) + return args.Error(0) +} + +type MockConn struct { +} + +func (m *MockConn) Read(b []byte) (n int, err error) { + return len(b), nil +} + +func (m *MockConn) Write(b []byte) (n int, err error) { + return len(b), nil +} + +func (m *MockConn) Close() error { + return nil +} + +func (m *MockConn) LocalAddr() net.Addr { + return &net.IPAddr{} +} + +func (m *MockConn) RemoteAddr() net.Addr { + return &net.IPAddr{} +} + +func (m *MockConn) SetDeadline(t time.Time) error { + return nil +} + +func (m *MockConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (m *MockConn) SetWriteDeadline(t time.Time) error { + return nil +} diff --git a/event/event.go b/event/event.go index 1a94766..c956767 100644 --- a/event/event.go +++ b/event/event.go @@ -22,12 +22,16 @@ func FindEventType(name string) (EventType, bool) { return EventType(-1), false } +func NewEventType(name string) EventType { + return ciota(name) +} + var ( INSTANCE_VIEWPORT_RESIZE = ciota("instance viewport resize") INSTANCE_DELETE = ciota("instance delete") INSTANCE_NEW = ciota("instance new") INSTANCE_STATS = ciota("instance stats") - INSTANCE_TERMINAL_OUT = ciota("instance terminal out") + SESSION_NEW = ciota("session new") SESSION_END = ciota("session end") SESSION_READY = ciota("session ready") SESSION_BUILDER_OUT = ciota("session builder out") diff --git a/event/mock.go b/event/mock.go new file mode 100644 index 0000000..136fb5d --- /dev/null +++ b/event/mock.go @@ -0,0 +1,19 @@ +package event + +import "github.com/stretchr/testify/mock" + +type Mock struct { + M mock.Mock +} + +func (m *Mock) Emit(name EventType, sessionId string, args ...interface{}) { + m.M.Called(name, sessionId, args) +} + +func (m *Mock) On(name EventType, handler Handler) { + m.M.Called(name, handler) +} + +func (m *Mock) OnAny(handler AnyHandler) { + m.M.Called(handler) +} diff --git a/handlers/bootstrap.go b/handlers/bootstrap.go index be32d59..489e65f 100644 --- a/handlers/bootstrap.go +++ b/handlers/bootstrap.go @@ -6,9 +6,10 @@ import ( "github.com/googollee/go-socket.io" "github.com/play-with-docker/play-with-docker/config" + "github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/event" - "github.com/play-with-docker/play-with-docker/provider" "github.com/play-with-docker/play-with-docker/pwd" + "github.com/play-with-docker/play-with-docker/scheduler" "github.com/play-with-docker/play-with-docker/storage" ) @@ -17,19 +18,18 @@ var e event.EventApi var ws *socketio.Server func Bootstrap() { - sp := provider.NewLocalSessionProvider() - - e = event.NewLocalBroker() - - t := pwd.NewScheduler(e, sp) s, err := storage.NewFileStorage(config.SessionsFile) + e = event.NewLocalBroker() + + f := docker.NewLocalCachedFactory(s) if err != nil && !os.IsNotExist(err) { log.Fatal("Error initializing StorageAPI: ", err) } - core = pwd.NewPWD(sp, t, e, s) + core = pwd.NewPWD(f, e, s) + scheduler.NewScheduler(s, e, core) } func RegisterEvents(s *socketio.Server) { diff --git a/handlers/sshproxy.go b/handlers/sshproxy.go deleted file mode 100644 index 68d9d8f..0000000 --- a/handlers/sshproxy.go +++ /dev/null @@ -1,206 +0,0 @@ -package handlers - -import ( - "fmt" - "io" - "io/ioutil" - "log" - "net" - "strings" - "sync" - - "golang.org/x/crypto/ssh" -) - -var sshConfig = &ssh.ServerConfig{ - PublicKeyCallback: func(c ssh.ConnMetadata, pubKey ssh.PublicKey) (*ssh.Permissions, error) { - user := c.User() - chunks := strings.Split(user, "-") - ip := strings.Join(chunks[:4], ".") - sessionPrefix := chunks[4] - - log.Println(ip, sessionPrefix) - - return nil, nil - }, -} - -func ListenSSHProxy(laddr string) { - privateBytes, err := ioutil.ReadFile("/etc/ssh/ssh_host_rsa_key") - if err != nil { - log.Fatal("Failed to load private key: ", err) - } - - private, err := ssh.ParsePrivateKey(privateBytes) - if err != nil { - log.Fatal("Failed to parse private key: ", err) - } - - sshConfig.AddHostKey(private) - - listener, err := net.Listen("tcp", laddr) - if err != nil { - log.Fatal("failed to listen for connection: ", err) - } - for { - nConn, err := listener.Accept() - if err != nil { - log.Fatal("failed to accept incoming connection: ", err) - } - - go handle(nConn) - } -} - -func handle(c net.Conn) { - sshCon, chans, reqs, err := ssh.NewServerConn(c, sshConfig) - if err != nil { - c.Close() - return - } - - user := sshCon.User() - chunks := strings.Split(user, "-") - ip := strings.Join(chunks[:4], ".") - sessionPrefix := chunks[4] - - i := core.InstanceFindByIPAndSession(sessionPrefix, ip) - if i == nil { - log.Printf("Couldn't find instance with ip [%s] in session [%s]\n", ip, sessionPrefix) - c.Close() - return - } - - // The incoming Request channel must be serviced. - go ssh.DiscardRequests(reqs) - - newChannel := <-chans - if newChannel == nil { - sshCon.Close() - return - } - - if newChannel.ChannelType() != "session" { - newChannel.Reject(ssh.UnknownChannelType, "unknown channel type") - return - } - - channel, requests, err := newChannel.Accept() - if err != nil { - log.Fatalf("Could not accept channel: %v", err) - } - - stderr := channel.Stderr() - - fmt.Fprintf(stderr, "Connecting to %s\r\n", ip) - - clientConfig := &ssh.ClientConfig{ - User: "root", - Auth: []ssh.AuthMethod{ - ssh.Password("root"), - }, - HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error { - return nil - }, - } - - client, err := ssh.Dial("tcp", fmt.Sprintf("%s:22", ip), clientConfig) - if err != nil { - fmt.Fprintf(stderr, "Connect failed: %v\r\n", err) - channel.Close() - return - } - - go func() { - for newChannel = range chans { - if newChannel == nil { - return - } - - channel2, reqs2, err := client.OpenChannel(newChannel.ChannelType(), newChannel.ExtraData()) - if err != nil { - x, ok := err.(*ssh.OpenChannelError) - if ok { - newChannel.Reject(x.Reason, x.Message) - } else { - newChannel.Reject(ssh.Prohibited, "remote server denied channel request") - } - continue - } - - channel, reqs, err := newChannel.Accept() - if err != nil { - channel2.Close() - continue - } - go proxy(reqs, reqs2, channel, channel2) - } - }() - - // Forward the session channel - channel2, reqs2, err := client.OpenChannel("session", []byte{}) - if err != nil { - fmt.Fprintf(stderr, "Remote session setup failed: %v\r\n", err) - channel.Close() - return - } - - maskedReqs := make(chan *ssh.Request, 1) - go func() { - for req := range requests { - if req.Type == "auth-agent-req@openssh.com" { - continue - } - maskedReqs <- req - } - }() - proxy(maskedReqs, reqs2, channel, channel2) -} - -func proxy(reqs1, reqs2 <-chan *ssh.Request, channel1, channel2 ssh.Channel) { - var closer sync.Once - closeFunc := func() { - channel1.Close() - channel2.Close() - } - - defer closer.Do(closeFunc) - - closerChan := make(chan bool, 1) - - go func() { - io.Copy(channel1, channel2) - closerChan <- true - }() - - go func() { - io.Copy(channel2, channel1) - closerChan <- true - }() - - for { - select { - case req := <-reqs1: - if req == nil { - return - } - b, err := channel2.SendRequest(req.Type, req.WantReply, req.Payload) - if err != nil { - return - } - req.Reply(b, nil) - - case req := <-reqs2: - if req == nil { - return - } - b, err := channel1.SendRequest(req.Type, req.WantReply, req.Payload) - if err != nil { - return - } - req.Reply(b, nil) - case <-closerChan: - return - } - } -} diff --git a/handlers/ws.go b/handlers/ws.go index 65eb6d2..9a97859 100644 --- a/handlers/ws.go +++ b/handlers/ws.go @@ -3,9 +3,15 @@ package handlers import ( "fmt" "log" + "net" + "sync" + + "golang.org/x/text/encoding" "github.com/googollee/go-socket.io" "github.com/gorilla/mux" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd/types" ) func WS(so socketio.Socket) { @@ -26,6 +32,61 @@ func WS(so socketio.Socket) { so.Join(session.Id) + var rw sync.Mutex + trackedTerminals := make(map[string]net.Conn, len(session.Instances)) + + attachTerminalToSocket := func(instance *types.Instance, ws socketio.Socket) { + rw.Lock() + defer rw.Unlock() + if _, found := trackedTerminals[instance.Name]; found { + return + } + conn, err := core.InstanceGetTerminal(instance) + if err != nil { + log.Println(err) + return + } + trackedTerminals[instance.Name] = conn + + go func(instanceName string, c net.Conn, ws socketio.Socket) { + defer c.Close() + encoder := encoding.Replacement.NewEncoder() + buf := make([]byte, 1024) + for { + n, err := c.Read(buf) + if err != nil { + log.Println(err) + return + } + b, err := encoder.Bytes(buf[:n]) + if err != nil { + log.Println(err) + return + } + ws.Emit("instance terminal out", instanceName, b) + } + }(instance.Name, conn, ws) + } + // since this is a new connection, get all terminals of the session and attach + for _, instance := range session.Instances { + attachTerminalToSocket(instance, so) + } + + e.On(event.INSTANCE_NEW, func(sessionId string, args ...interface{}) { + if sessionId != session.Id { + return + } + + // There is a new instance in a session we are tracking. We should track it's terminal + instanceName := args[0].(string) + instance := core.InstanceGet(session, instanceName) + if instance == nil { + log.Printf("Instance [%s] was not found in session [%s]\n", instanceName, sessionId) + return + } + attachTerminalToSocket(instance, so) + }) + client := core.ClientNew(so.Id(), session) so.On("session close", func() { @@ -33,8 +94,14 @@ func WS(so socketio.Socket) { }) so.On("instance terminal in", func(name, data string) { - // User wrote something on the terminal. Need to write it to the instance terminal - core.InstanceWriteToTerminal(session.Id, name, data) + rw.Lock() + defer rw.Unlock() + conn, found := trackedTerminals[name] + if !found { + log.Printf("Could not find instance [%s] in session [%s]\n", name, sessionId) + return + } + go conn.Write([]byte(data)) }) so.On("instance viewport resize", func(cols, rows uint) { diff --git a/provider/local_session_provider.go b/provider/local_session_provider.go deleted file mode 100644 index bf1936f..0000000 --- a/provider/local_session_provider.go +++ /dev/null @@ -1,36 +0,0 @@ -package provider - -import ( - "sync" - - "github.com/docker/docker/client" - "github.com/play-with-docker/play-with-docker/docker" -) - -type localSessionProvider struct { - rw sync.Mutex - - docker docker.DockerApi -} - -func (p *localSessionProvider) GetDocker(sessionId string) (docker.DockerApi, error) { - p.rw.Lock() - defer p.rw.Unlock() - - if p.docker != nil { - return p.docker, nil - } - - c, err := client.NewEnvClient() - if err != nil { - return nil, err - } - d := docker.NewDocker(c) - - p.docker = d - return d, nil -} - -func NewLocalSessionProvider() *localSessionProvider { - return &localSessionProvider{} -} diff --git a/provider/provider.go b/provider/provider.go deleted file mode 100644 index 7c99152..0000000 --- a/provider/provider.go +++ /dev/null @@ -1,10 +0,0 @@ -package provider - -import "github.com/play-with-docker/play-with-docker/docker" - -type InstanceProvider interface { -} - -type SessionProvider interface { - GetDocker(sessionId string) (docker.DockerApi, error) -} diff --git a/pwd/check_swarm_status_task.go b/pwd/check_swarm_status_task.go deleted file mode 100644 index 8f0ee94..0000000 --- a/pwd/check_swarm_status_task.go +++ /dev/null @@ -1,28 +0,0 @@ -package pwd - -import ( - "log" - - "github.com/docker/docker/api/types/swarm" - "github.com/play-with-docker/play-with-docker/pwd/types" -) - -type checkSwarmStatusTask struct { -} - -func (c checkSwarmStatusTask) Run(i *types.Instance) error { - if i.Docker == nil { - return nil - } - if info, err := i.Docker.GetDaemonInfo(); err == nil { - if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked { - i.IsManager = &info.Swarm.ControlAvailable - } else { - i.IsManager = nil - } - } else { - log.Println(err) - return err - } - return nil -} diff --git a/pwd/check_swarm_used_ports.go b/pwd/check_swarm_used_ports.go deleted file mode 100644 index a62dd05..0000000 --- a/pwd/check_swarm_used_ports.go +++ /dev/null @@ -1,35 +0,0 @@ -package pwd - -import ( - "fmt" - "log" - - "github.com/play-with-docker/play-with-docker/pwd/types" -) - -type checkSwarmUsedPortsTask struct { -} - -func (c checkSwarmUsedPortsTask) Run(i *types.Instance) error { - if i.Docker == nil { - return nil - } - if i.IsManager != nil && *i.IsManager { - sessionPrefix := i.Session.Id[:8] - // This is a swarm manager instance, then check for ports - if hosts, ports, err := i.Docker.GetSwarmPorts(); err != nil { - log.Println(err) - return err - } else { - for _, host := range hosts { - host = fmt.Sprintf("%s_%s", sessionPrefix, host) - for _, port := range ports { - if i.Session.Instances[host] != nil { - i.Session.Instances[host].SetUsedPort(port) - } - } - } - } - } - return nil -} diff --git a/pwd/check_used_ports_task.go b/pwd/check_used_ports_task.go deleted file mode 100644 index 83ffb89..0000000 --- a/pwd/check_used_ports_task.go +++ /dev/null @@ -1,25 +0,0 @@ -package pwd - -import ( - "log" - - "github.com/play-with-docker/play-with-docker/pwd/types" -) - -type checkUsedPortsTask struct { -} - -func (c checkUsedPortsTask) Run(i *types.Instance) error { - if i.Docker == nil { - return nil - } - if ports, err := i.Docker.GetPorts(); err == nil { - for _, p := range ports { - i.SetUsedPort(uint16(p)) - } - } else { - log.Println(err) - return err - } - return nil -} diff --git a/pwd/client_test.go b/pwd/client_test.go index cf687dd..df4ab30 100644 --- a/pwd/client_test.go +++ b/pwd/client_test.go @@ -1,23 +1,38 @@ package pwd import ( - "sync" "testing" "time" + "github.com/play-with-docker/play-with-docker/config" + "github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/event" "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/play-with-docker/play-with-docker/storage" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestClientNew(t *testing.T) { - d := &mockDocker{} - tasks := &mockTasks{} - e := event.NewLocalBroker() - storage := &mockStorage{} - sp := &mockSessionProvider{docker: d} + _s := &storage.Mock{} + _f := &docker.FactoryMock{} + _g := &mockGenerator{} + _d := &docker.Mock{} + _e := &event.Mock{} - p := NewPWD(sp, tasks, e, storage) + _g.On("NewId").Return("aaaabbbbcccc") + _f.On("GetForSession", "aaaabbbbcccc").Return(_d, nil) + _d.On("CreateNetwork", "aaaabbbbcccc").Return(nil) + _d.On("ConnectNetwork", config.L2ContainerName, "aaaabbbbcccc", "").Return("10.0.0.1", nil) + _s.On("SessionPut", mock.AnythingOfType("*types.Session")).Return(nil) + _s.On("SessionCount").Return(1, nil) + _s.On("InstanceCount").Return(0, nil) + + var nilArgs []interface{} + _e.M.On("Emit", event.SESSION_NEW, "aaaabbbbcccc", nilArgs).Return() + + p := NewPWD(_f, _e, _s) + p.generator = _g session, err := p.SessionNew(time.Hour, "", "", "") assert.Nil(t, err) @@ -26,15 +41,33 @@ func TestClientNew(t *testing.T) { assert.Equal(t, types.Client{Id: "foobar", Session: session, ViewPort: types.ViewPort{Cols: 0, Rows: 0}}, *client) assert.Contains(t, session.Clients, client) -} -func TestClientCount(t *testing.T) { - d := &mockDocker{} - tasks := &mockTasks{} - e := event.NewLocalBroker() - storage := &mockStorage{} - sp := &mockSessionProvider{docker: d} - p := NewPWD(sp, tasks, e, storage) + _d.AssertExpectations(t) + _f.AssertExpectations(t) + _s.AssertExpectations(t) + _g.AssertExpectations(t) + _e.M.AssertExpectations(t) +} + +func TestClientCount(t *testing.T) { + _s := &storage.Mock{} + _f := &docker.FactoryMock{} + _g := &mockGenerator{} + _d := &docker.Mock{} + _e := &event.Mock{} + + _g.On("NewId").Return("aaaabbbbcccc") + _f.On("GetForSession", "aaaabbbbcccc").Return(_d, nil) + _d.On("CreateNetwork", "aaaabbbbcccc").Return(nil) + _d.On("ConnectNetwork", config.L2ContainerName, "aaaabbbbcccc", "").Return("10.0.0.1", nil) + _s.On("SessionPut", mock.AnythingOfType("*types.Session")).Return(nil) + _s.On("SessionCount").Return(1, nil) + _s.On("InstanceCount").Return(-1, nil) + var nilArgs []interface{} + _e.M.On("Emit", event.SESSION_NEW, "aaaabbbbcccc", nilArgs).Return() + + p := NewPWD(_f, _e, _s) + p.generator = _g session, err := p.SessionNew(time.Hour, "", "", "") assert.Nil(t, err) @@ -42,38 +75,46 @@ func TestClientCount(t *testing.T) { p.ClientNew("foobar", session) assert.Equal(t, 1, p.ClientCount()) + + _d.AssertExpectations(t) + _f.AssertExpectations(t) + _s.AssertExpectations(t) + _g.AssertExpectations(t) + _e.M.AssertExpectations(t) } func TestClientResizeViewPort(t *testing.T) { - wg := sync.WaitGroup{} - wg.Add(1) - d := &mockDocker{} - tasks := &mockTasks{} - e := event.NewLocalBroker() - sp := &mockSessionProvider{docker: d} + _s := &storage.Mock{} + _f := &docker.FactoryMock{} + _g := &mockGenerator{} + _d := &docker.Mock{} + _e := &event.Mock{} - broadcastedSessionId := "" - broadcastedArgs := []interface{}{} + _g.On("NewId").Return("aaaabbbbcccc") + _f.On("GetForSession", "aaaabbbbcccc").Return(_d, nil) + _d.On("CreateNetwork", "aaaabbbbcccc").Return(nil) + _d.On("ConnectNetwork", config.L2ContainerName, "aaaabbbbcccc", "").Return("10.0.0.1", nil) + _s.On("SessionPut", mock.AnythingOfType("*types.Session")).Return(nil) + _s.On("SessionCount").Return(1, nil) + _s.On("InstanceCount").Return(0, nil) + var nilArgs []interface{} + _e.M.On("Emit", event.SESSION_NEW, "aaaabbbbcccc", nilArgs).Return() - e.On(event.INSTANCE_VIEWPORT_RESIZE, func(sessionId string, args ...interface{}) { - broadcastedSessionId = sessionId - broadcastedArgs = args - wg.Done() - }) - - storage := &mockStorage{} - - p := NewPWD(sp, tasks, e, storage) + _e.M.On("Emit", event.INSTANCE_VIEWPORT_RESIZE, "aaaabbbbcccc", []interface{}{uint(80), uint(24)}).Return() + p := NewPWD(_f, _e, _s) + p.generator = _g session, err := p.SessionNew(time.Hour, "", "", "") assert.Nil(t, err) client := p.ClientNew("foobar", session) p.ClientResizeViewPort(client, 80, 24) - wg.Wait() assert.Equal(t, types.ViewPort{Cols: 80, Rows: 24}, client.ViewPort) - assert.Equal(t, session.Id, broadcastedSessionId) - assert.Equal(t, uint(80), broadcastedArgs[0]) - assert.Equal(t, uint(24), broadcastedArgs[1]) + + _d.AssertExpectations(t) + _f.AssertExpectations(t) + _s.AssertExpectations(t) + _g.AssertExpectations(t) + _e.M.AssertExpectations(t) } diff --git a/pwd/collect_stats_task.go b/pwd/collect_stats_task.go deleted file mode 100644 index 4eb36d8..0000000 --- a/pwd/collect_stats_task.go +++ /dev/null @@ -1,71 +0,0 @@ -package pwd - -import ( - "encoding/json" - "fmt" - "log" - - dockerTypes "github.com/docker/docker/api/types" - units "github.com/docker/go-units" - "github.com/play-with-docker/play-with-docker/provider" - "github.com/play-with-docker/play-with-docker/pwd/types" -) - -type collectStatsTask struct { - mem float64 - memLimit float64 - memPercent float64 - - cpuPercent float64 - previousCPU uint64 - previousSystem uint64 - - sessionProvider provider.SessionProvider -} - -func (c collectStatsTask) Run(i *types.Instance) error { - docker, _ := c.sessionProvider.GetDocker(i.SessionId) - reader, err := docker.GetContainerStats(i.Name) - if err != nil { - log.Println("Error while trying to collect instance stats", err) - return err - } - dec := json.NewDecoder(reader) - var v *dockerTypes.StatsJSON - e := dec.Decode(&v) - if e != nil { - log.Println("Error while trying to collect instance stats", e) - return err - } - // Memory - if v.MemoryStats.Limit != 0 { - c.memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 - } - c.mem = float64(v.MemoryStats.Usage) - c.memLimit = float64(v.MemoryStats.Limit) - - i.Mem = fmt.Sprintf("%.2f%% (%s / %s)", c.memPercent, units.BytesSize(c.mem), units.BytesSize(c.memLimit)) - - // cpu - c.previousCPU = v.PreCPUStats.CPUUsage.TotalUsage - c.previousSystem = v.PreCPUStats.SystemUsage - c.cpuPercent = calculateCPUPercentUnix(c.previousCPU, c.previousSystem, v) - i.Cpu = fmt.Sprintf("%.2f%%", c.cpuPercent) - - return nil -} - -func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *dockerTypes.StatsJSON) float64 { - var ( - cpuPercent = 0.0 - // calculate the change for the cpu usage of the container in between readings - cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU) - // calculate the change for the entire system between readings - systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem) - ) - - if systemDelta > 0.0 && cpuDelta > 0.0 { - cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0 - } - return cpuPercent -} diff --git a/pwd/docker_mock_test.go b/pwd/docker_mock_test.go deleted file mode 100644 index 6bf8494..0000000 --- a/pwd/docker_mock_test.go +++ /dev/null @@ -1,141 +0,0 @@ -package pwd - -import ( - "io" - "net" - "time" - - "github.com/docker/docker/api/types" - "github.com/play-with-docker/play-with-docker/docker" -) - -type mockDocker struct { - createNetwork func(string) error - connectNetwork func(container, network, ip string) (string, error) - containerResize func(string, uint, uint) error - createContainer func(opts docker.CreateContainerOpts) (string, error) - execAttach func(instanceName string, command []string, out io.Writer) (int, error) - new func(ip string, cert, key []byte) (docker.DockerApi, error) - swarmInit func() (*docker.SwarmTokens, error) - swarmJoin func(addr, token string) error - createAttachConnection func(name string) (net.Conn, error) -} - -func (m *mockDocker) CreateNetwork(id string) error { - if m.createNetwork == nil { - return nil - } - return m.createNetwork(id) -} -func (m *mockDocker) ConnectNetwork(container, network, ip string) (string, error) { - if m.connectNetwork == nil { - return "10.0.0.1", nil - } - return m.connectNetwork(container, network, ip) -} - -func (m *mockDocker) GetDaemonInfo() (types.Info, error) { - return types.Info{}, nil -} - -func (m *mockDocker) GetSwarmPorts() ([]string, []uint16, error) { - return []string{}, []uint16{}, nil -} -func (m *mockDocker) GetPorts() ([]uint16, error) { - return []uint16{}, nil -} -func (m *mockDocker) GetContainerStats(name string) (io.ReadCloser, error) { - return nil, nil -} -func (m *mockDocker) ContainerResize(name string, rows, cols uint) error { - if m.containerResize != nil { - return m.containerResize(name, rows, cols) - } - return nil -} -func (m *mockDocker) CreateAttachConnection(name string) (net.Conn, error) { - if m.createAttachConnection != nil { - return m.createAttachConnection(name) - } - return &mockConn{}, nil -} -func (m *mockDocker) CopyToContainer(containerName, destination, fileName string, content io.Reader) error { - return nil -} -func (m *mockDocker) DeleteContainer(id string) error { - return nil -} -func (m *mockDocker) CreateContainer(opts docker.CreateContainerOpts) (string, error) { - if m.createContainer != nil { - return m.createContainer(opts) - } - return "10.0.0.1", nil -} -func (m *mockDocker) ExecAttach(instanceName string, command []string, out io.Writer) (int, error) { - if m.execAttach != nil { - return m.execAttach(instanceName, command, out) - } - return 0, nil -} -func (m *mockDocker) DisconnectNetwork(containerId, networkId string) error { - return nil -} -func (m *mockDocker) DeleteNetwork(id string) error { - return nil -} -func (m *mockDocker) Exec(instanceName string, command []string) (int, error) { - return 0, nil -} -func (m *mockDocker) New(ip string, cert, key []byte) (docker.DockerApi, error) { - if m.new != nil { - return m.new(ip, cert, key) - } - return nil, nil -} -func (m *mockDocker) SwarmInit() (*docker.SwarmTokens, error) { - if m.swarmInit != nil { - return m.swarmInit() - } - return nil, nil -} -func (m *mockDocker) SwarmJoin(addr, token string) error { - if m.swarmJoin != nil { - return m.swarmJoin(addr, token) - } - return nil -} - -type mockConn struct { -} - -func (m *mockConn) Read(b []byte) (n int, err error) { - return len(b), nil -} - -func (m *mockConn) Write(b []byte) (n int, err error) { - return len(b), nil -} - -func (m *mockConn) Close() error { - return nil -} - -func (m *mockConn) LocalAddr() net.Addr { - return &net.IPAddr{} -} - -func (m *mockConn) RemoteAddr() net.Addr { - return &net.IPAddr{} -} - -func (m *mockConn) SetDeadline(t time.Time) error { - return nil -} - -func (m *mockConn) SetReadDeadline(t time.Time) error { - return nil -} - -func (m *mockConn) SetWriteDeadline(t time.Time) error { - return nil -} diff --git a/pwd/instance.go b/pwd/instance.go index edebd0e..c84ef8b 100644 --- a/pwd/instance.go +++ b/pwd/instance.go @@ -16,26 +16,10 @@ import ( "github.com/play-with-docker/play-with-docker/event" "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/play-with-docker/play-with-docker/router" - - "golang.org/x/text/encoding" ) -type sessionWriter struct { - sessionId string - instanceName string - event event.EventApi -} - -var terms = make(map[string]map[string]net.Conn) - -func (s *sessionWriter) Write(p []byte) (n int, err error) { - s.event.Emit(event.INSTANCE_TERMINAL_OUT, s.sessionId, s.instanceName, string(p)) - return len(p), nil -} - type InstanceConfig struct { ImageName string - Alias string Hostname string ServerCert []byte ServerKey []byte @@ -50,27 +34,15 @@ func (p *pwd) InstanceResizeTerminal(instance *types.Instance, rows, cols uint) return p.docker(instance.SessionId).ContainerResize(instance.Name, rows, cols) } -func (p *pwd) InstanceAttachTerminal(instance *types.Instance) error { - // already have a connection for this instance - if getInstanceTermConn(instance.SessionId, instance.Name) != nil { - return nil - } +func (p *pwd) InstanceGetTerminal(instance *types.Instance) (net.Conn, error) { + defer observeAction("InstanceGetTerminal", time.Now()) conn, err := p.docker(instance.SessionId).CreateAttachConnection(instance.Name) if err != nil { - return err + return nil, err } - encoder := encoding.Replacement.NewEncoder() - sw := &sessionWriter{sessionId: instance.Session.Id, instanceName: instance.Name, event: p.event} - if terms[instance.SessionId] == nil { - terms[instance.SessionId] = map[string]net.Conn{instance.Name: conn} - } else { - terms[instance.SessionId][instance.Name] = conn - } - io.Copy(encoder.Writer(sw), conn) - - return nil + return conn, nil } func (p *pwd) InstanceUploadFromUrl(instance *types.Instance, fileName, dest string, url string) error { @@ -137,9 +109,9 @@ func (p *pwd) InstanceGet(session *types.Session, name string) *types.Instance { return session.Instances[name] } -func (p *pwd) InstanceFindByIP(ip string) *types.Instance { - defer observeAction("InstanceFindByIP", time.Now()) - i, err := p.storage.InstanceFindByIP(ip) +func (p *pwd) InstanceFind(sessionId, ip string) *types.Instance { + defer observeAction("InstanceFind", time.Now()) + i, err := p.storage.InstanceFind(sessionId, ip) if err != nil { return nil } @@ -147,31 +119,9 @@ func (p *pwd) InstanceFindByIP(ip string) *types.Instance { return i } -func (p *pwd) InstanceFindByIPAndSession(sessionPrefix, ip string) *types.Instance { - defer observeAction("InstanceFindByIPAndSession", time.Now()) - i, err := p.storage.InstanceFindByIPAndSession(sessionPrefix, ip) - if err != nil { - return nil - } - - return i -} - -func (p *pwd) InstanceFindByAlias(sessionPrefix, alias string) *types.Instance { - defer observeAction("InstanceFindByAlias", time.Now()) - i, err := p.storage.InstanceFindByAlias(sessionPrefix, alias) - if err != nil { - return nil - } - return i -} - func (p *pwd) InstanceDelete(session *types.Session, instance *types.Instance) error { defer observeAction("InstanceDelete", time.Now()) - conn := getInstanceTermConn(session.Id, instance.Name) - if conn != nil { - conn.Close() - } + err := p.docker(session.Id).DeleteContainer(instance.Name) if err != nil && !strings.Contains(err.Error(), "No such container") { log.Println(err) @@ -255,7 +205,6 @@ func (p *pwd) InstanceNew(session *types.Session, conf InstanceConfig) (*types.I instance.SessionId = session.Id instance.Name = containerName instance.Hostname = conf.Hostname - instance.Alias = conf.Alias instance.Cert = conf.Cert instance.Key = conf.Key instance.ServerCert = conf.ServerCert @@ -271,7 +220,7 @@ func (p *pwd) InstanceNew(session *types.Session, conf InstanceConfig) (*types.I } session.Instances[instance.Name] = instance - go p.InstanceAttachTerminal(instance) + // go p.InstanceAttachTerminal(instance) err = p.storage.InstanceCreate(session.Id, instance) if err != nil { @@ -285,14 +234,6 @@ func (p *pwd) InstanceNew(session *types.Session, conf InstanceConfig) (*types.I return instance, nil } -func (p *pwd) InstanceWriteToTerminal(sessionId, instanceName string, data string) { - defer observeAction("InstanceWriteToTerminal", time.Now()) - conn := getInstanceTermConn(sessionId, instanceName) - if conn != nil && len(data) > 0 { - conn.Write([]byte(data)) - } -} - func (p *pwd) InstanceAllowedImages() []string { defer observeAction("InstanceAllowedImages", time.Now()) @@ -308,7 +249,3 @@ func (p *pwd) InstanceExec(instance *types.Instance, cmd []string) (int, error) defer observeAction("InstanceExec", time.Now()) return p.docker(instance.SessionId).Exec(instance.Name, cmd) } - -func getInstanceTermConn(sessionId, instanceName string) net.Conn { - return terms[sessionId][instanceName] -} diff --git a/pwd/instance_test.go b/pwd/instance_test.go index 804ac49..3475bc4 100644 --- a/pwd/instance_test.go +++ b/pwd/instance_test.go @@ -1,10 +1,7 @@ package pwd import ( - "errors" "fmt" - "net" - "sync" "testing" "time" @@ -13,75 +10,67 @@ import ( "github.com/play-with-docker/play-with-docker/event" "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/play-with-docker/play-with-docker/router" + "github.com/play-with-docker/play-with-docker/storage" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestInstanceResizeTerminal(t *testing.T) { - resizedInstanceName := "" - resizedRows := uint(0) - resizedCols := uint(0) + _d := &docker.Mock{} + _f := &docker.FactoryMock{} + _s := &storage.Mock{} + _g := &mockGenerator{} + _e := &event.Mock{} - docker := &mockDocker{} - docker.containerResize = func(name string, rows, cols uint) error { - resizedInstanceName = name - resizedRows = rows - resizedCols = cols + _d.On("ContainerResize", "foobar", uint(24), uint(80)).Return(nil) + _f.On("GetForSession", "aaaabbbbcccc").Return(_d, nil) - return nil - } - sp := &mockSessionProvider{docker: docker} - - tasks := &mockTasks{} - e := event.NewLocalBroker() - storage := &mockStorage{} - - p := NewPWD(sp, tasks, e, storage) - - err := p.InstanceResizeTerminal(&types.Instance{Name: "foobar"}, 24, 80) + p := NewPWD(_f, _e, _s) + err := p.InstanceResizeTerminal(&types.Instance{Name: "foobar", SessionId: "aaaabbbbcccc"}, 24, 80) assert.Nil(t, err) - assert.Equal(t, "foobar", resizedInstanceName) - assert.Equal(t, uint(24), resizedRows) - assert.Equal(t, uint(80), resizedCols) + + _d.AssertExpectations(t) + _f.AssertExpectations(t) + _s.AssertExpectations(t) + _g.AssertExpectations(t) + _e.M.AssertExpectations(t) } func TestInstanceNew(t *testing.T) { - containerOpts := docker.CreateContainerOpts{} - dock := &mockDocker{} - dock.createContainer = func(opts docker.CreateContainerOpts) (string, error) { - containerOpts = opts - return "10.0.0.1", nil - } - sp := &mockSessionProvider{docker: dock} + _d := &docker.Mock{} + _f := &docker.FactoryMock{} + _s := &storage.Mock{} + _g := &mockGenerator{} + _e := &event.Mock{} - tasks := &mockTasks{} - e := event.NewLocalBroker() - storage := &mockStorage{} + _g.On("NewId").Return("aaaabbbbcccc") + _f.On("GetForSession", "aaaabbbbcccc").Return(_d, nil) + _d.On("CreateNetwork", "aaaabbbbcccc").Return(nil) + _d.On("ConnectNetwork", config.L2ContainerName, "aaaabbbbcccc", "").Return("10.0.0.1", nil) + _s.On("SessionPut", mock.AnythingOfType("*types.Session")).Return(nil) + _s.On("SessionCount").Return(1, nil) + _s.On("InstanceCount").Return(0, nil) - p := NewPWD(sp, tasks, e, storage) + var nilArgs []interface{} + _e.M.On("Emit", event.SESSION_NEW, "aaaabbbbcccc", nilArgs).Return() + + p := NewPWD(_f, _e, _s) + p.generator = _g session, err := p.SessionNew(time.Hour, "", "", "") - - assert.Nil(t, err) - - instance, err := p.InstanceNew(session, InstanceConfig{Host: "something.play-with-docker.com"}) - assert.Nil(t, err) expectedInstance := types.Instance{ Name: fmt.Sprintf("%s_node1", session.Id[:8]), Hostname: "node1", IP: "10.0.0.1", - Alias: "", Image: config.GetDindImageName(), IsDockerHost: true, SessionId: session.Id, Session: session, Proxy: router.EncodeHost(session.Id, "10.0.0.1", router.HostOpts{}), } - - assert.Equal(t, expectedInstance, *instance) - expectedContainerOpts := docker.CreateContainerOpts{ Image: expectedInstance.Image, SessionId: session.Id, @@ -94,89 +83,57 @@ func TestInstanceNew(t *testing.T) { Privileged: true, HostFQDN: "something.play-with-docker.com", } - assert.Equal(t, expectedContainerOpts, containerOpts) -} - -func TestInstanceNew_Concurrency(t *testing.T) { - i := 0 - dock := &mockDocker{} - dock.createContainer = func(opts docker.CreateContainerOpts) (string, error) { - time.Sleep(time.Second) - i++ - return fmt.Sprintf("10.0.0.%d", i), nil - } - sp := &mockSessionProvider{docker: dock} - - tasks := &mockTasks{} - e := event.NewLocalBroker() - storage := &mockStorage{} - - p := NewPWD(sp, tasks, e, storage) - - session, err := p.SessionNew(time.Hour, "", "", "") + _d.On("CreateContainer", expectedContainerOpts).Return("10.0.0.1", nil) + _s.On("InstanceCreate", "aaaabbbbcccc", mock.AnythingOfType("*types.Instance")).Return(nil) + _e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_node1", "10.0.0.1", "node1"}).Return() + instance, err := p.InstanceNew(session, InstanceConfig{Host: "something.play-with-docker.com"}) assert.Nil(t, err) - var instance1 *types.Instance - var instance2 *types.Instance + assert.Equal(t, expectedInstance, *instance) - wg := sync.WaitGroup{} - wg.Add(2) - - go func() { - defer wg.Done() - instance, err := p.InstanceNew(session, InstanceConfig{}) - assert.Nil(t, err) - instance1 = instance - }() - go func() { - defer wg.Done() - instance, err := p.InstanceNew(session, InstanceConfig{}) - assert.Nil(t, err) - instance2 = instance - }() - wg.Wait() - - assert.Subset(t, []string{"node1", "node2"}, []string{instance1.Hostname, instance2.Hostname}) + _d.AssertExpectations(t) + _f.AssertExpectations(t) + _s.AssertExpectations(t) + _g.AssertExpectations(t) + _e.M.AssertExpectations(t) } func TestInstanceNew_WithNotAllowedImage(t *testing.T) { - containerOpts := docker.CreateContainerOpts{} - dock := &mockDocker{} - dock.createContainer = func(opts docker.CreateContainerOpts) (string, error) { - containerOpts = opts - return "10.0.0.1", nil - } - sp := &mockSessionProvider{docker: dock} + _d := &docker.Mock{} + _f := &docker.FactoryMock{} + _s := &storage.Mock{} + _g := &mockGenerator{} + _e := &event.Mock{} - tasks := &mockTasks{} - e := event.NewLocalBroker() - storage := &mockStorage{} + _g.On("NewId").Return("aaaabbbbcccc") + _f.On("GetForSession", "aaaabbbbcccc").Return(_d, nil) + _d.On("CreateNetwork", "aaaabbbbcccc").Return(nil) + _d.On("ConnectNetwork", config.L2ContainerName, "aaaabbbbcccc", "").Return("10.0.0.1", nil) + _s.On("SessionPut", mock.AnythingOfType("*types.Session")).Return(nil) + _s.On("SessionCount").Return(1, nil) + _s.On("InstanceCount").Return(0, nil) - p := NewPWD(sp, tasks, e, storage) + var nilArgs []interface{} + _e.M.On("Emit", event.SESSION_NEW, "aaaabbbbcccc", nilArgs).Return() + + p := NewPWD(_f, _e, _s) + p.generator = _g session, err := p.SessionNew(time.Hour, "", "", "") assert.Nil(t, err) - instance, err := p.InstanceNew(session, InstanceConfig{ImageName: "redis"}) - - assert.Nil(t, err) - expectedInstance := types.Instance{ Name: fmt.Sprintf("%s_node1", session.Id[:8]), Hostname: "node1", IP: "10.0.0.1", - Alias: "", Image: "redis", SessionId: session.Id, IsDockerHost: false, Session: session, - Proxy: instance.Proxy, + Proxy: router.EncodeHost(session.Id, "10.0.0.1", router.HostOpts{}), } - - assert.Equal(t, expectedInstance, *instance) - expectedContainerOpts := docker.CreateContainerOpts{ Image: expectedInstance.Image, SessionId: session.Id, @@ -188,46 +145,56 @@ func TestInstanceNew_WithNotAllowedImage(t *testing.T) { CACert: nil, Privileged: false, } - assert.Equal(t, expectedContainerOpts, containerOpts) + _d.On("CreateContainer", expectedContainerOpts).Return("10.0.0.1", nil) + _s.On("InstanceCreate", "aaaabbbbcccc", mock.AnythingOfType("*types.Instance")).Return(nil) + _e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_node1", "10.0.0.1", "node1"}).Return() + + instance, err := p.InstanceNew(session, InstanceConfig{ImageName: "redis"}) + assert.Nil(t, err) + + assert.Equal(t, expectedInstance, *instance) + + _d.AssertExpectations(t) + _f.AssertExpectations(t) + _s.AssertExpectations(t) + _g.AssertExpectations(t) + _e.M.AssertExpectations(t) } func TestInstanceNew_WithCustomHostname(t *testing.T) { - containerOpts := docker.CreateContainerOpts{} - dock := &mockDocker{} - dock.createContainer = func(opts docker.CreateContainerOpts) (string, error) { - containerOpts = opts - return "10.0.0.1", nil - } - sp := &mockSessionProvider{docker: dock} + _d := &docker.Mock{} + _f := &docker.FactoryMock{} + _s := &storage.Mock{} + _g := &mockGenerator{} + _e := &event.Mock{} - tasks := &mockTasks{} - e := event.NewLocalBroker() - storage := &mockStorage{} + _g.On("NewId").Return("aaaabbbbcccc") + _f.On("GetForSession", "aaaabbbbcccc").Return(_d, nil) + _d.On("CreateNetwork", "aaaabbbbcccc").Return(nil) + _d.On("ConnectNetwork", config.L2ContainerName, "aaaabbbbcccc", "").Return("10.0.0.1", nil) + _s.On("SessionPut", mock.AnythingOfType("*types.Session")).Return(nil) + _s.On("SessionCount").Return(1, nil) + _s.On("InstanceCount").Return(0, nil) - p := NewPWD(sp, tasks, e, storage) + var nilArgs []interface{} + _e.M.On("Emit", event.SESSION_NEW, "aaaabbbbcccc", nilArgs).Return() + + p := NewPWD(_f, _e, _s) + p.generator = _g session, err := p.SessionNew(time.Hour, "", "", "") - - assert.Nil(t, err) - - instance, err := p.InstanceNew(session, InstanceConfig{ImageName: "redis", Hostname: "redis-master"}) - assert.Nil(t, err) expectedInstance := types.Instance{ Name: fmt.Sprintf("%s_redis-master", session.Id[:8]), Hostname: "redis-master", IP: "10.0.0.1", - Alias: "", Image: "redis", IsDockerHost: false, Session: session, SessionId: session.Id, - Proxy: instance.Proxy, + Proxy: router.EncodeHost(session.Id, "10.0.0.1", router.HostOpts{}), } - - assert.Equal(t, expectedInstance, *instance) - expectedContainerOpts := docker.CreateContainerOpts{ Image: expectedInstance.Image, SessionId: session.Id, @@ -239,56 +206,32 @@ func TestInstanceNew_WithCustomHostname(t *testing.T) { CACert: nil, Privileged: false, } - assert.Equal(t, expectedContainerOpts, containerOpts) + + _d.On("CreateContainer", expectedContainerOpts).Return("10.0.0.1", nil) + _s.On("InstanceCreate", "aaaabbbbcccc", mock.AnythingOfType("*types.Instance")).Return(nil) + _e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_redis-master", "10.0.0.1", "redis-master"}).Return() + + instance, err := p.InstanceNew(session, InstanceConfig{ImageName: "redis", Hostname: "redis-master"}) + + assert.Nil(t, err) + + assert.Equal(t, expectedInstance, *instance) + + _d.AssertExpectations(t) + _f.AssertExpectations(t) + _s.AssertExpectations(t) + _g.AssertExpectations(t) + _e.M.AssertExpectations(t) } func TestInstanceAllowedImages(t *testing.T) { - dock := &mockDocker{} - tasks := &mockTasks{} - e := event.NewLocalBroker() - storage := &mockStorage{} - sp := &mockSessionProvider{docker: dock} + _f := &docker.FactoryMock{} + _s := &storage.Mock{} + _e := &event.Mock{} - p := NewPWD(sp, tasks, e, storage) + p := NewPWD(_f, _e, _s) expectedImages := []string{config.GetDindImageName(), "franela/dind:overlay2-dev", "franela/ucp:2.4.1"} assert.Equal(t, expectedImages, p.InstanceAllowedImages()) } - -type errConn struct { - *mockConn -} - -func (ec errConn) Read(b []byte) (int, error) { - return 0, errors.New("Error") -} - -func TestTermConnAssignment(t *testing.T) { - dock := &mockDocker{} - tasks := &mockTasks{} - e := event.NewLocalBroker() - storage := &mockStorage{} - - dock.createAttachConnection = func(name string) (net.Conn, error) { - // return error connection to unlock the goroutine - return errConn{}, nil - } - sp := &mockSessionProvider{docker: dock} - - p := NewPWD(sp, tasks, e, storage) - session, _ := p.SessionNew(time.Hour, "", "", "") - mockInstance := &types.Instance{ - Name: fmt.Sprintf("%s_redis-master", session.Id[:8]), - Hostname: "redis-master", - IP: "10.0.0.1", - Alias: "", - SessionId: session.Id, - Image: "redis", - IsDockerHost: false, - Session: session, - } - p.InstanceAttachTerminal(mockInstance) - assert.NotNil(t, getInstanceTermConn(session.Id, mockInstance.Name)) - -} diff --git a/pwd/mock.go b/pwd/mock.go new file mode 100644 index 0000000..11dc9b4 --- /dev/null +++ b/pwd/mock.go @@ -0,0 +1,112 @@ +package pwd + +import ( + "io" + "net" + "time" + + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/stretchr/testify/mock" +) + +type Mock struct { + mock.Mock +} + +func (m *Mock) SessionNew(duration time.Duration, stack string, stackName, imageName string) (*types.Session, error) { + args := m.Called(duration, stack, stackName, imageName) + return args.Get(0).(*types.Session), args.Error(1) +} + +func (m *Mock) SessionClose(session *types.Session) error { + args := m.Called(session) + return args.Error(0) +} + +func (m *Mock) SessionGetSmallestViewPort(session *types.Session) types.ViewPort { + args := m.Called(session) + return args.Get(0).(types.ViewPort) +} + +func (m *Mock) SessionDeployStack(session *types.Session) error { + args := m.Called(session) + return args.Error(0) +} + +func (m *Mock) SessionGet(id string) *types.Session { + args := m.Called(id) + return args.Get(0).(*types.Session) +} + +func (m *Mock) SessionSetup(session *types.Session, conf SessionSetupConf) error { + args := m.Called(session, conf) + return args.Error(0) +} + +func (m *Mock) InstanceNew(session *types.Session, conf InstanceConfig) (*types.Instance, error) { + args := m.Called(session, conf) + return args.Get(0).(*types.Instance), args.Error(1) +} + +func (m *Mock) InstanceResizeTerminal(instance *types.Instance, cols, rows uint) error { + args := m.Called(instance, cols, rows) + return args.Error(0) +} + +func (m *Mock) InstanceGetTerminal(instance *types.Instance) (net.Conn, error) { + args := m.Called(instance) + return args.Get(0).(net.Conn), args.Error(1) +} + +func (m *Mock) InstanceUploadFromUrl(instance *types.Instance, fileName, dest, url string) error { + args := m.Called(instance, fileName, dest, url) + return args.Error(0) +} + +func (m *Mock) InstanceUploadFromReader(instance *types.Instance, fileName, dest string, reader io.Reader) error { + args := m.Called(instance, fileName, dest, reader) + return args.Error(0) +} + +func (m *Mock) InstanceGet(session *types.Session, name string) *types.Instance { + args := m.Called(session, name) + return args.Get(0).(*types.Instance) +} + +func (m *Mock) InstanceFind(session, ip string) *types.Instance { + args := m.Called(session, ip) + return args.Get(0).(*types.Instance) +} + +func (m *Mock) InstanceDelete(session *types.Session, instance *types.Instance) error { + args := m.Called(session, instance) + return args.Error(0) +} + +func (m *Mock) InstanceAllowedImages() []string { + args := m.Called() + return args.Get(0).([]string) +} + +func (m *Mock) InstanceExec(instance *types.Instance, cmd []string) (int, error) { + args := m.Called(instance, cmd) + return args.Int(0), args.Error(1) +} + +func (m *Mock) ClientNew(id string, session *types.Session) *types.Client { + args := m.Called(id, session) + return args.Get(0).(*types.Client) +} + +func (m *Mock) ClientResizeViewPort(client *types.Client, cols, rows uint) { + m.Called(client, cols, rows) +} + +func (m *Mock) ClientClose(client *types.Client) { + m.Called(client) +} + +func (m *Mock) ClientCount() int { + args := m.Called() + return args.Int(0) +} diff --git a/pwd/pwd.go b/pwd/pwd.go index d2b3842..557bb4c 100644 --- a/pwd/pwd.go +++ b/pwd/pwd.go @@ -2,14 +2,16 @@ package pwd import ( "io" + "net" "time" "github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/event" - "github.com/play-with-docker/play-with-docker/provider" "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/play-with-docker/play-with-docker/storage" "github.com/prometheus/client_golang/prometheus" + "github.com/rs/xid" + "github.com/stretchr/testify/mock" ) var ( @@ -45,11 +47,31 @@ func init() { } type pwd struct { - sessionProvider provider.SessionProvider - tasks SchedulerApi - event event.EventApi - storage storage.StorageApi - clientCount int32 + dockerFactory docker.FactoryApi + event event.EventApi + storage storage.StorageApi + generator IdGenerator + clientCount int32 +} + +type IdGenerator interface { + NewId() string +} + +type xidGenerator struct { +} + +func (x xidGenerator) NewId() string { + return xid.New().String() +} + +type mockGenerator struct { + mock.Mock +} + +func (m *mockGenerator) NewId() string { + args := m.Called() + return args.String(0) } type PWDApi interface { @@ -62,16 +84,13 @@ type PWDApi interface { InstanceNew(session *types.Session, conf InstanceConfig) (*types.Instance, error) InstanceResizeTerminal(instance *types.Instance, cols, rows uint) error - InstanceAttachTerminal(instance *types.Instance) error + InstanceGetTerminal(instance *types.Instance) (net.Conn, error) InstanceUploadFromUrl(instance *types.Instance, fileName, dest, url string) error InstanceUploadFromReader(instance *types.Instance, fileName, dest string, reader io.Reader) error InstanceGet(session *types.Session, name string) *types.Instance // TODO remove this function when we add the session prefix to the PWD url - InstanceFindByIP(ip string) *types.Instance - InstanceFindByAlias(sessionPrefix, alias string) *types.Instance - InstanceFindByIPAndSession(sessionPrefix, ip string) *types.Instance + InstanceFind(sessionId, ip string) *types.Instance InstanceDelete(session *types.Session, instance *types.Instance) error - InstanceWriteToTerminal(sessionId, instanceName string, data string) InstanceAllowedImages() []string InstanceExec(instance *types.Instance, cmd []string) (int, error) @@ -81,12 +100,12 @@ type PWDApi interface { ClientCount() int } -func NewPWD(sp provider.SessionProvider, t SchedulerApi, e event.EventApi, s storage.StorageApi) *pwd { - return &pwd{sessionProvider: sp, tasks: t, event: e, storage: s} +func NewPWD(f docker.FactoryApi, e event.EventApi, s storage.StorageApi) *pwd { + return &pwd{dockerFactory: f, event: e, storage: s, generator: xidGenerator{}} } func (p *pwd) docker(sessionId string) docker.DockerApi { - d, err := p.sessionProvider.GetDocker(sessionId) + d, err := p.dockerFactory.GetForSession(sessionId) if err != nil { panic("Should not have got here. Session always need to be validated before calling this.") } diff --git a/pwd/session.go b/pwd/session.go index 65d26fa..6165b1c 100644 --- a/pwd/session.go +++ b/pwd/session.go @@ -14,7 +14,6 @@ import ( "github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/event" "github.com/play-with-docker/play-with-docker/pwd/types" - "github.com/rs/xid" ) var preparedSessions = map[string]bool{} @@ -44,7 +43,7 @@ func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName str defer observeAction("SessionNew", time.Now()) s := &types.Session{} - s.Id = xid.New().String() + s.Id = p.generator.NewId() s.Instances = map[string]*types.Instance{} s.CreatedAt = time.Now() s.ExpiresAt = s.CreatedAt.Add(duration) @@ -68,8 +67,7 @@ func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName str } log.Printf("Network [%s] created for session [%s]\n", s.Id, s.Id) - if _, err := p.prepareSession(s); err != nil { - log.Println(err) + if err := p.connectToNetwork(s); err != nil { return nil, err } @@ -79,6 +77,7 @@ func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName str } p.setGauges() + p.event.Emit(event.SESSION_NEW, s.Id) return s, nil } @@ -89,9 +88,6 @@ func (p *pwd) SessionClose(s *types.Session) error { s.Lock() defer s.Unlock() - s.StopTicker() - - p.event.Emit(event.SESSION_END, s.Id) log.Printf("Starting clean up of session [%s]\n", s.Id) for _, i := range s.Instances { err := p.InstanceDelete(s, i) @@ -101,7 +97,7 @@ func (p *pwd) SessionClose(s *types.Session) error { } } // Disconnect PWD daemon from the network - if err := p.docker(s.Id).DisconnectNetwork(config.PWDContainerName, s.Id); err != nil { + if err := p.docker(s.Id).DisconnectNetwork(config.L2ContainerName, s.Id); err != nil { if !strings.Contains(err.Error(), "is not connected to the network") { log.Println("ERROR NETWORKING") return err @@ -122,6 +118,7 @@ func (p *pwd) SessionClose(s *types.Session) error { log.Printf("Cleaned up session [%s]\n", s.Id) p.setGauges() + p.event.Emit(event.SESSION_END, s.Id) return nil } @@ -193,10 +190,6 @@ func (p *pwd) SessionGet(sessionId string) *types.Session { return nil } - if _, err := p.prepareSession(s); err != nil { - log.Println(err) - return nil - } return s } @@ -217,14 +210,11 @@ func (p *pwd) SessionSetup(session *types.Session, conf SessionSetupConf) error if err != nil { return err } - if i.Docker == nil { - dock, err := p.docker(session.Id).New(i.IP, i.Cert, i.Key) - if err != nil { - return err - } - i.Docker = dock + dockerClient, err := p.dockerFactory.GetForInstance(session.Id, i.Name) + if err != nil { + return err } - tkns, err := i.Docker.SwarmInit() + tkns, err := dockerClient.SwarmInit() if err != nil { return err } @@ -251,31 +241,30 @@ func (p *pwd) SessionSetup(session *types.Session, conf SessionSetupConf) error log.Println(err) return } - if c.IsSwarmManager || c.IsSwarmWorker { - // check if we have connection to the daemon, if not, create it - if i.Docker == nil { - dock, err := p.docker(session.Id).New(i.IP, i.Cert, i.Key) + + if firstSwarmManager != nil { + if c.IsSwarmManager { + dockerClient, err := p.dockerFactory.GetForInstance(session.Id, i.Name) if err != nil { log.Println(err) return } - i.Docker = dock - } - } - - if firstSwarmManager != nil { - if c.IsSwarmManager { // this is a swarm manager // cluster has already been initiated, join as manager - err := i.Docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager) + err = dockerClient.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager) if err != nil { log.Println(err) return } } if c.IsSwarmWorker { + dockerClient, err := p.dockerFactory.GetForInstance(session.Id, i.Name) + if err != nil { + log.Println(err) + return + } // this is a swarm worker - err := i.Docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Worker) + err = dockerClient.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Worker) if err != nil { log.Println(err) return @@ -290,11 +279,7 @@ func (p *pwd) SessionSetup(session *types.Session, conf SessionSetupConf) error return nil } -func isSessionPrepared(sessionId string) bool { - _, ok := preparedSessions[sessionId] - return ok -} - +/* // This function should be called any time a session needs to be prepared: // 1. Like when it is created // 2. When it was loaded from storage @@ -306,16 +291,11 @@ func (p *pwd) prepareSession(session *types.Session) (bool, error) { return false, nil } - p.scheduleSessionClose(session) - // Connect PWD daemon to the new network if err := p.connectToNetwork(session); err != nil { return false, err } - // Schedule periodic tasks - p.tasks.Schedule(session) - for _, i := range session.Instances { // wire the session back to the instance i.Session = session @@ -325,16 +305,10 @@ func (p *pwd) prepareSession(session *types.Session) (bool, error) { return true, nil } - -func (p *pwd) scheduleSessionClose(s *types.Session) { - timeLeft := s.ExpiresAt.Sub(time.Now()) - s.SetClosingTimer(time.AfterFunc(timeLeft, func() { - p.SessionClose(s) - })) -} +*/ func (p *pwd) connectToNetwork(s *types.Session) error { - ip, err := p.docker(s.Id).ConnectNetwork(config.PWDContainerName, s.Id, s.PwdIpAddress) + ip, err := p.docker(s.Id).ConnectNetwork(config.L2ContainerName, s.Id, s.PwdIpAddress) if err != nil { log.Println("ERROR NETWORKING") return err diff --git a/pwd/session_provider_mock_test.go b/pwd/session_provider_mock_test.go deleted file mode 100644 index 04a2c14..0000000 --- a/pwd/session_provider_mock_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package pwd - -import "github.com/play-with-docker/play-with-docker/docker" - -type mockSessionProvider struct { - docker docker.DockerApi -} - -func (p *mockSessionProvider) GetDocker(sessionId string) (docker.DockerApi, error) { - return p.docker, nil -} diff --git a/pwd/session_test.go b/pwd/session_test.go index cb8e6d3..705695d 100644 --- a/pwd/session_test.go +++ b/pwd/session_test.go @@ -1,57 +1,43 @@ package pwd import ( - "fmt" "testing" "time" "github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/event" - "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/play-with-docker/play-with-docker/storage" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestSessionNew(t *testing.T) { config.PWDContainerName = "pwd" - var connectContainerName, connectNetworkName, connectIP string - createdNetworkId := "" - saveCalled := false - expectedSessions := map[string]*types.Session{} - docker := &mockDocker{} - docker.createNetwork = func(id string) error { - createdNetworkId = id - return nil - } - docker.connectNetwork = func(containerName, networkName, ip string) (string, error) { - connectContainerName = containerName - connectNetworkName = networkName - connectIP = ip - return "10.0.0.1", nil - } - sp := &mockSessionProvider{docker: docker} + _d := &docker.Mock{} + _f := &docker.FactoryMock{} + _s := &storage.Mock{} + _g := &mockGenerator{} + _e := &event.Mock{} - var scheduledSession *types.Session - tasks := &mockTasks{} - tasks.schedule = func(s *types.Session) { - scheduledSession = s - } + _g.On("NewId").Return("aaaabbbbcccc") + _f.On("GetForSession", "aaaabbbbcccc").Return(_d, nil) + _d.On("CreateNetwork", "aaaabbbbcccc").Return(nil) + _d.On("ConnectNetwork", config.L2ContainerName, "aaaabbbbcccc", "").Return("10.0.0.1", nil) + _s.On("SessionPut", mock.AnythingOfType("*types.Session")).Return(nil) + _s.On("SessionCount").Return(1, nil) + _s.On("InstanceCount").Return(0, nil) - ev := event.NewLocalBroker() - storage := &mockStorage{} - storage.sessionPut = func(s *types.Session) error { - saveCalled = true - return nil - } + var nilArgs []interface{} + _e.M.On("Emit", event.SESSION_NEW, "aaaabbbbcccc", nilArgs).Return() - p := NewPWD(sp, tasks, ev, storage) + p := NewPWD(_f, _e, _s) + p.generator = _g before := time.Now() s, e := p.SessionNew(time.Hour, "", "", "") - expectedSessions[s.Id] = s - assert.Nil(t, e) assert.NotNil(t, s) @@ -60,119 +46,68 @@ func TestSessionNew(t *testing.T) { assert.NotEmpty(t, s.Id) assert.WithinDuration(t, s.CreatedAt, before, time.Since(before)) assert.WithinDuration(t, s.ExpiresAt, before.Add(time.Hour), time.Second) - assert.Equal(t, s.Id, createdNetworkId) assert.True(t, s.Ready) s, _ = p.SessionNew(time.Hour, "stackPath", "stackName", "imageName") - expectedSessions[s.Id] = s assert.Equal(t, "stackPath", s.Stack) assert.Equal(t, "stackName", s.StackName) assert.Equal(t, "imageName", s.ImageName) assert.False(t, s.Ready) - assert.NotNil(t, s.ClosingTimer()) - - assert.Equal(t, config.PWDContainerName, connectContainerName) - assert.Equal(t, s.Id, connectNetworkName) - assert.Empty(t, connectIP) - assert.Equal(t, "10.0.0.1", s.PwdIpAddress) - assert.Equal(t, s, scheduledSession) - - assert.True(t, saveCalled) + _d.AssertExpectations(t) + _f.AssertExpectations(t) + _s.AssertExpectations(t) + _g.AssertExpectations(t) + _e.M.AssertExpectations(t) } func TestSessionSetup(t *testing.T) { - swarmInitOnMaster1 := false - manager2JoinedHasManager := false - manager3JoinedHasManager := false - worker1JoinedHasWorker := false + _d := &docker.Mock{} + _f := &docker.FactoryMock{} + _s := &storage.Mock{} + _g := &mockGenerator{} + _e := &event.Mock{} - dock := &mockDocker{} - dock.createContainer = func(opts docker.CreateContainerOpts) (string, error) { - if opts.Hostname == "manager1" { - return "10.0.0.1", nil - } else if opts.Hostname == "manager2" { - return "10.0.0.2", nil - } else if opts.Hostname == "manager3" { - return "10.0.0.3", nil - } else if opts.Hostname == "worker1" { - return "10.0.0.4", nil - } else if opts.Hostname == "other" { - return "10.0.0.5", nil - } else { - assert.Fail(t, "Should not have reached here") - } - return "", nil - } - dock.new = func(ip string, cert, key []byte) (docker.DockerApi, error) { - if ip == "10.0.0.1" { - return &mockDocker{ - swarmInit: func() (*docker.SwarmTokens, error) { - swarmInitOnMaster1 = true - return &docker.SwarmTokens{Worker: "worker-join-token", Manager: "manager-join-token"}, nil - }, - }, nil - } - if ip == "10.0.0.2" { - return &mockDocker{ - swarmInit: func() (*docker.SwarmTokens, error) { - assert.Fail(t, "Shouldn't have reached here.") - return nil, nil - }, - swarmJoin: func(addr, token string) error { - if addr == "10.0.0.1:2377" && token == "manager-join-token" { - manager2JoinedHasManager = true - return nil - } - assert.Fail(t, "Shouldn't have reached here.") - return nil - }, - }, nil - } - if ip == "10.0.0.3" { - return &mockDocker{ - swarmInit: func() (*docker.SwarmTokens, error) { - assert.Fail(t, "Shouldn't have reached here.") - return nil, nil - }, - swarmJoin: func(addr, token string) error { - if addr == "10.0.0.1:2377" && token == "manager-join-token" { - manager3JoinedHasManager = true - return nil - } - assert.Fail(t, "Shouldn't have reached here.") - return nil - }, - }, nil - } - if ip == "10.0.0.4" { - return &mockDocker{ - swarmInit: func() (*docker.SwarmTokens, error) { - assert.Fail(t, "Shouldn't have reached here.") - return nil, nil - }, - swarmJoin: func(addr, token string) error { - if addr == "10.0.0.1:2377" && token == "worker-join-token" { - worker1JoinedHasWorker = true - return nil - } - assert.Fail(t, "Shouldn't have reached here.") - return nil - }, - }, nil - } - assert.Fail(t, "Shouldn't have reached here.") - return nil, nil - } - sp := &mockSessionProvider{docker: dock} - tasks := &mockTasks{} - ev := event.NewLocalBroker() - storage := &mockStorage{} + _g.On("NewId").Return("aaaabbbbcccc") + _f.On("GetForSession", "aaaabbbbcccc").Return(_d, nil) + _d.On("CreateNetwork", "aaaabbbbcccc").Return(nil) + _d.On("ConnectNetwork", config.L2ContainerName, "aaaabbbbcccc", "").Return("10.0.0.1", nil) + _s.On("SessionPut", mock.AnythingOfType("*types.Session")).Return(nil) + _s.On("InstanceCreate", "aaaabbbbcccc", mock.AnythingOfType("*types.Instance")).Return(nil) + _s.On("SessionCount").Return(1, nil) + _s.On("InstanceCount").Return(0, nil) - p := NewPWD(sp, tasks, ev, storage) + _d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_manager1", Hostname: "manager1", Privileged: true}).Return("10.0.0.2", nil) + _f.On("GetForInstance", "aaaabbbbcccc", "aaaabbbb_manager1").Return(_d, nil) + _d.On("SwarmInit").Return(&docker.SwarmTokens{Manager: "managerToken", Worker: "workerToken"}, nil) + _e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_manager1", "10.0.0.2", "manager1"}).Return() + + _d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_manager2", Hostname: "manager2", Privileged: true}).Return("10.0.0.3", nil) + _f.On("GetForInstance", "aaaabbbbcccc", "aaaabbbb_manager2").Return(_d, nil) + _d.On("SwarmJoin", "10.0.0.2:2377", "managerToken").Return(nil) + _e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_manager2", "10.0.0.3", "manager2"}).Return() + + _d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind:overlay2-dev", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_manager3", Hostname: "manager3", Privileged: true}).Return("10.0.0.4", nil) + _f.On("GetForInstance", "aaaabbbbcccc", "aaaabbbb_manager3").Return(_d, nil) + _d.On("SwarmJoin", "10.0.0.2:2377", "managerToken").Return(nil) + _e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_manager3", "10.0.0.4", "manager3"}).Return() + + _d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_worker1", Hostname: "worker1", Privileged: true}).Return("10.0.0.5", nil) + _f.On("GetForInstance", "aaaabbbbcccc", "aaaabbbb_worker1").Return(_d, nil) + _d.On("SwarmJoin", "10.0.0.2:2377", "workerToken").Return(nil) + _e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_worker1", "10.0.0.5", "worker1"}).Return() + + _d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_other", Hostname: "other", Privileged: true}).Return("10.0.0.6", nil) + _e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_other", "10.0.0.6", "other"}).Return() + + var nilArgs []interface{} + _e.M.On("Emit", event.SESSION_NEW, "aaaabbbbcccc", nilArgs).Return() + + p := NewPWD(_f, _e, _s) + p.generator = _g s, e := p.SessionNew(time.Hour, "", "", "") assert.Nil(t, e) @@ -205,101 +140,9 @@ func TestSessionSetup(t *testing.T) { assert.Equal(t, 5, len(s.Instances)) - manager1 := fmt.Sprintf("%s_manager1", s.Id[:8]) - manager1Received := *s.Instances[manager1] - assert.Equal(t, types.Instance{ - Name: manager1, - Image: "franela/dind", - Hostname: "manager1", - IP: "10.0.0.1", - SessionId: s.Id, - Alias: "", - IsDockerHost: true, - Session: s, - Docker: manager1Received.Docker, - Proxy: manager1Received.Proxy, - }, manager1Received) - - manager2 := fmt.Sprintf("%s_manager2", s.Id[:8]) - manager2Received := *s.Instances[manager2] - assert.Equal(t, types.Instance{ - Name: manager2, - Image: "franela/dind", - Hostname: "manager2", - IP: "10.0.0.2", - Alias: "", - IsDockerHost: true, - SessionId: s.Id, - Session: s, - Docker: manager2Received.Docker, - Proxy: manager2Received.Proxy, - }, manager2Received) - - manager3 := fmt.Sprintf("%s_manager3", s.Id[:8]) - manager3Received := *s.Instances[manager3] - assert.Equal(t, types.Instance{ - Name: manager3, - Image: "franela/dind:overlay2-dev", - Hostname: "manager3", - IP: "10.0.0.3", - Alias: "", - SessionId: s.Id, - IsDockerHost: true, - Session: s, - Docker: manager3Received.Docker, - Proxy: manager3Received.Proxy, - }, manager3Received) - - worker1 := fmt.Sprintf("%s_worker1", s.Id[:8]) - worker1Received := *s.Instances[worker1] - assert.Equal(t, types.Instance{ - Name: worker1, - Image: "franela/dind", - Hostname: "worker1", - IP: "10.0.0.4", - Alias: "", - SessionId: s.Id, - IsDockerHost: true, - Session: s, - Docker: worker1Received.Docker, - Proxy: worker1Received.Proxy, - }, worker1Received) - - other := fmt.Sprintf("%s_other", s.Id[:8]) - otherReceived := *s.Instances[other] - assert.Equal(t, types.Instance{ - Name: other, - Image: "franela/dind", - Hostname: "other", - IP: "10.0.0.5", - Alias: "", - SessionId: s.Id, - IsDockerHost: true, - Session: s, - Docker: otherReceived.Docker, - Proxy: otherReceived.Proxy, - }, otherReceived) - - assert.True(t, swarmInitOnMaster1) - assert.True(t, manager2JoinedHasManager) - assert.True(t, manager3JoinedHasManager) - assert.True(t, worker1JoinedHasWorker) -} - -func TestSessionPrepareOnce(t *testing.T) { - dock := &mockDocker{} - tasks := &mockTasks{} - ev := event.NewLocalBroker() - storage := &mockStorage{} - sp := &mockSessionProvider{docker: dock} - - p := NewPWD(sp, tasks, ev, storage) - session := &types.Session{Id: "1234"} - prepared, err := p.prepareSession(session) - assert.True(t, preparedSessions[session.Id]) - assert.True(t, prepared) - - prepared, err = p.prepareSession(session) - assert.Nil(t, err) - assert.False(t, prepared) + _d.AssertExpectations(t) + _f.AssertExpectations(t) + _s.AssertExpectations(t) + _g.AssertExpectations(t) + _e.M.AssertExpectations(t) } diff --git a/pwd/storage_mock_test.go b/pwd/storage_mock_test.go deleted file mode 100644 index abbf40e..0000000 --- a/pwd/storage_mock_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package pwd - -import "github.com/play-with-docker/play-with-docker/pwd/types" - -type mockStorage struct { - sessionGet func(sessionId string) (*types.Session, error) - sessionGetAll func() (map[string]*types.Session, error) - sessionPut func(s *types.Session) error - sessionCount func() (int, error) - sessionDelete func(sessionId string) error - instanceFindByAlias func(sessionPrefix, alias string) (*types.Instance, error) - instanceFindByIP func(ip string) (*types.Instance, error) - instanceFindByIPAndSession func(sessionPrefix, ip string) (*types.Instance, error) - instanceCreate func(string, *types.Instance) error - instanceDelete func(sessionId string, instanceName string) error - instanceCount func() (int, error) - clientCount func() (int, error) -} - -func (m *mockStorage) SessionGet(sessionId string) (*types.Session, error) { - if m.sessionGet != nil { - return m.sessionGet(sessionId) - } - return nil, nil -} - -func (m *mockStorage) SessionGetAll() (map[string]*types.Session, error) { - if m.sessionGetAll != nil { - return m.sessionGetAll() - } - return nil, nil -} - -func (m *mockStorage) SessionPut(s *types.Session) error { - if m.sessionPut != nil { - return m.sessionPut(s) - } - return nil -} -func (m *mockStorage) SessionCount() (int, error) { - if m.sessionCount != nil { - return m.sessionCount() - } - return 0, nil -} -func (m *mockStorage) SessionDelete(sessionId string) error { - if m.sessionDelete != nil { - return m.sessionDelete(sessionId) - } - return nil -} -func (m *mockStorage) InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error) { - if m.instanceFindByAlias != nil { - return m.instanceFindByAlias(sessionPrefix, alias) - } - return nil, nil -} -func (m *mockStorage) InstanceFindByIP(ip string) (*types.Instance, error) { - if m.instanceFindByIP != nil { - return m.instanceFindByIP(ip) - } - return nil, nil -} -func (m *mockStorage) InstanceFindByIPAndSession(sessionPrefix, ip string) (*types.Instance, error) { - if m.instanceFindByIPAndSession != nil { - return m.instanceFindByIPAndSession(sessionPrefix, ip) - } - return nil, nil -} -func (m *mockStorage) InstanceCreate(sessionId string, instance *types.Instance) error { - if m.instanceCreate != nil { - return m.instanceCreate(sessionId, instance) - } - return nil -} -func (m *mockStorage) InstanceDelete(sessionId, instanceName string) error { - if m.instanceDelete != nil { - return m.instanceDelete(sessionId, instanceName) - } - return nil -} -func (m *mockStorage) InstanceCount() (int, error) { - if m.instanceCount != nil { - return m.instanceCount() - } - return 0, nil -} -func (m *mockStorage) ClientCount() (int, error) { - if m.clientCount != nil { - return m.clientCount() - } - return 0, nil -} diff --git a/pwd/tasks.go b/pwd/tasks.go deleted file mode 100644 index 7d23b5b..0000000 --- a/pwd/tasks.go +++ /dev/null @@ -1,120 +0,0 @@ -package pwd - -import ( - "crypto/tls" - "fmt" - "log" - "net" - "net/http" - "sort" - "strings" - "sync" - "time" - - "github.com/docker/docker/api" - "github.com/docker/docker/client" - "github.com/docker/go-connections/tlsconfig" - "github.com/play-with-docker/play-with-docker/docker" - "github.com/play-with-docker/play-with-docker/event" - "github.com/play-with-docker/play-with-docker/provider" - "github.com/play-with-docker/play-with-docker/pwd/types" -) - -type periodicTask interface { - Run(i *types.Instance) error -} - -type SchedulerApi interface { - Schedule(session *types.Session) - Unschedule(session *types.Session) -} - -type scheduler struct { - event event.EventApi - periodicTasks []periodicTask -} - -func (sch *scheduler) Schedule(s *types.Session) { - if isSessionPrepared(s.Id) { - return - } - - go func() { - t := time.NewTicker(1 * time.Second) - s.SetTicker(t) - for range t.C { - var wg = sync.WaitGroup{} - wg.Add(len(s.Instances)) - for _, ins := range s.Instances { - var i *types.Instance = ins - if i.Docker == nil && i.IsDockerHost { - // Need to create client to the DinD docker daemon - - // We check if the client needs to use TLS - var tlsConfig *tls.Config - if len(i.Cert) > 0 && len(i.Key) > 0 { - tlsConfig = tlsconfig.ClientDefault() - tlsConfig.InsecureSkipVerify = true - tlsCert, err := tls.X509KeyPair(i.Cert, i.Key) - if err != nil { - log.Println("Could not load X509 key pair: %v. Make sure the key is not encrypted", err) - continue - } - tlsConfig.Certificates = []tls.Certificate{tlsCert} - } - - transport := &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: 1 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext} - if tlsConfig != nil { - transport.TLSClientConfig = tlsConfig - } - cli := &http.Client{ - Transport: transport, - } - c, err := client.NewClient(fmt.Sprintf("http://%s:2375", i.IP), api.DefaultVersion, cli, nil) - if err != nil { - log.Println("Could not connect to DinD docker daemon", err) - } else { - i.Docker = docker.NewDocker(c) - } - } - go func() { - defer wg.Done() - for _, t := range sch.periodicTasks { - err := t.Run(i) - if err != nil { - if strings.Contains(err.Error(), "No such container") { - log.Printf("Container for instance [%s] doesn't exist any more.\n", i.IP) - //DeleteInstance(i.session, i) - } else { - log.Println(err) - } - break - } - } - }() - } - wg.Wait() - // broadcast all information - for _, ins := range s.Instances { - ins.Ports = types.UInt16Slice(ins.GetUsedPorts()) - sort.Sort(ins.Ports) - ins.CleanUsedPorts() - - sch.event.Emit(event.INSTANCE_STATS, ins.Session.Id, ins.Name, ins.Mem, ins.Cpu, ins.IsManager, ins.Ports) - } - } - }() -} - -func (sch *scheduler) Unschedule(s *types.Session) { -} - -func NewScheduler(e event.EventApi, sp provider.SessionProvider) *scheduler { - s := &scheduler{event: e} - s.periodicTasks = []periodicTask{&collectStatsTask{sessionProvider: sp}, &checkSwarmStatusTask{}, &checkUsedPortsTask{}, &checkSwarmUsedPortsTask{}} - return s -} diff --git a/pwd/tasks_mock_test.go b/pwd/tasks_mock_test.go deleted file mode 100644 index b184349..0000000 --- a/pwd/tasks_mock_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package pwd - -import "github.com/play-with-docker/play-with-docker/pwd/types" - -type mockTasks struct { - schedule func(s *types.Session) - unschedule func(s *types.Session) -} - -func (m *mockTasks) Schedule(s *types.Session) { - if m.schedule != nil { - m.schedule(s) - } -} -func (m *mockTasks) Unschedule(s *types.Session) { - if m.unschedule != nil { - m.unschedule(s) - } -} diff --git a/pwd/types/instance.go b/pwd/types/instance.go index b87a10c..e3cedd1 100644 --- a/pwd/types/instance.go +++ b/pwd/types/instance.go @@ -3,62 +3,22 @@ package types import ( "context" "sync" - - "github.com/play-with-docker/play-with-docker/docker" ) -type UInt16Slice []uint16 - -func (p UInt16Slice) Len() int { return len(p) } -func (p UInt16Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p UInt16Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - type Instance struct { - Image string `json:"image" bson:"image"` - Name string `json:"name" bson:"name"` - Hostname string `json:"hostname" bson:"hostname"` - IP string `json:"ip" bson:"ip"` - IsManager *bool `json:"is_manager" bson:"is_manager"` - Mem string `json:"mem" bson:"mem"` - Cpu string `json:"cpu" bson:"cpu"` - Alias string `json:"alias" bson:"alias"` - ServerCert []byte `json:"server_cert" bson:"server_cert"` - ServerKey []byte `json:"server_key" bson:"server_key"` - CACert []byte `json:"ca_cert" bson:"ca_cert"` - Cert []byte `json:"cert" bson:"cert"` - Key []byte `json:"key" bson:"key"` - IsDockerHost bool `json:"is_docker_host" bson:"is_docker_host"` - SessionId string `json:"session_id" bson:"session_id"` - SessionPrefix string `json:"session_prefix" bson:"session_prefix"` - Proxy string `json:"proxy" bson:"proxy"` - Docker docker.DockerApi `json:"-"` - Session *Session `json:"-" bson:"-"` - ctx context.Context `json:"-" bson:"-"` - tempPorts []uint16 `json:"-" bson:"-"` - Ports UInt16Slice - rw sync.Mutex -} - -func (i *Instance) SetUsedPort(port uint16) { - i.rw.Lock() - defer i.rw.Unlock() - - for _, p := range i.tempPorts { - if p == port { - return - } - } - i.tempPorts = append(i.tempPorts, port) -} -func (i *Instance) GetUsedPorts() []uint16 { - i.rw.Lock() - defer i.rw.Unlock() - - return i.tempPorts -} -func (i *Instance) CleanUsedPorts() { - i.rw.Lock() - defer i.rw.Unlock() - - i.tempPorts = []uint16{} + Image string `json:"image" bson:"image"` + Name string `json:"name" bson:"name"` + Hostname string `json:"hostname" bson:"hostname"` + IP string `json:"ip" bson:"ip"` + ServerCert []byte `json:"server_cert" bson:"server_cert"` + ServerKey []byte `json:"server_key" bson:"server_key"` + CACert []byte `json:"ca_cert" bson:"ca_cert"` + Cert []byte `json:"cert" bson:"cert"` + Key []byte `json:"key" bson:"key"` + IsDockerHost bool `json:"is_docker_host" bson:"is_docker_host"` + SessionId string `json:"session_id" bson:"session_id"` + Proxy string `json:"proxy" bson:"proxy"` + Session *Session `json:"-" bson:"-"` + ctx context.Context `json:"-" bson:"-"` + rw sync.Mutex } diff --git a/pwd/types/session.go b/pwd/types/session.go index 48c9f89..c7ac55b 100644 --- a/pwd/types/session.go +++ b/pwd/types/session.go @@ -17,9 +17,6 @@ type Session struct { ImageName string `json:"image_name"` Host string `json:"host"` Clients []*Client `json:"-" bson:"-"` - closingTimer *time.Timer `json:"-"` - scheduled bool `json:"-"` - ticker *time.Ticker `json:"-"` rw sync.Mutex `json:"-"` } @@ -30,18 +27,3 @@ func (s *Session) Lock() { func (s *Session) Unlock() { s.rw.Unlock() } - -func (s *Session) StopTicker() { - if s.ticker != nil { - s.ticker.Stop() - } -} -func (s *Session) SetTicker(t *time.Ticker) { - s.ticker = t -} -func (s *Session) SetClosingTimer(t *time.Timer) { - s.closingTimer = t -} -func (s *Session) ClosingTimer() *time.Timer { - return s.closingTimer -} diff --git a/task/scheduler.go b/scheduler/scheduler.go similarity index 62% rename from task/scheduler.go rename to scheduler/scheduler.go index feefe6c..7512fed 100644 --- a/task/scheduler.go +++ b/scheduler/scheduler.go @@ -1,10 +1,13 @@ -package task +package scheduler import ( "context" "fmt" + "log" "time" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd" "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/play-with-docker/play-with-docker/storage" ) @@ -26,21 +29,23 @@ type SchedulerApi interface { type scheduledSession struct { session *types.Session cancel context.CancelFunc - ctx context.Context ticker *time.Ticker } type scheduler struct { scheduledSessions map[string]*scheduledSession storage storage.StorageApi + event event.EventApi + pwd pwd.PWDApi tasks map[string]Task started bool } -func NewScheduler(s storage.StorageApi) (*scheduler, error) { - sch := &scheduler{storage: s} +func NewScheduler(s storage.StorageApi, e event.EventApi, p pwd.PWDApi) (*scheduler, error) { + sch := &scheduler{storage: s, event: e, pwd: p} sch.tasks = make(map[string]Task) + sch.scheduledSessions = make(map[string]*scheduledSession) err := sch.loadFromStorage() if err != nil { @@ -55,7 +60,6 @@ func (s *scheduler) loadFromStorage() error { if err != nil { return err } - s.scheduledSessions = make(map[string]*scheduledSession, len(sessions)) for _, session := range sessions { s.register(session) } @@ -81,27 +85,60 @@ func (s *scheduler) RemoveTask(task Task) error { return nil } +func (s *scheduler) Stop() { + for _, session := range s.scheduledSessions { + s.Unschedule(session.session) + } + s.started = false +} + func (s *scheduler) Start() { for _, session := range s.scheduledSessions { - go s.cron(session) + ctx, cancel := context.WithCancel(context.Background()) + session.cancel = cancel + session.ticker = time.NewTicker(1 * time.Second) + go s.cron(ctx, session) } + s.event.On(event.SESSION_NEW, func(sessionId string, args ...interface{}) { + session, err := s.storage.SessionGet(sessionId) + if err != nil { + log.Printf("Session [%s] was not found in storage. Got %s\n", sessionId, err) + return + } + s.Schedule(session) + }) + s.event.On(event.SESSION_END, func(sessionId string, args ...interface{}) { + session, err := s.storage.SessionGet(sessionId) + if err != nil { + log.Printf("Session [%s] was not found in storage. Got %s\n", sessionId, err) + return + } + err = s.Unschedule(session) + if err != nil { + log.Println(err) + return + } + }) s.started = true } func (s *scheduler) register(session *types.Session) *scheduledSession { - ctx, cancel := context.WithCancel(context.Background()) - s.scheduledSessions[session.Id] = &scheduledSession{session: session, cancel: cancel, ctx: ctx} + s.scheduledSessions[session.Id] = &scheduledSession{session: session} return s.scheduledSessions[session.Id] } -func (s *scheduler) cron(session *scheduledSession) { - session.ticker = time.NewTicker(1 * time.Second) - +func (s *scheduler) cron(ctx context.Context, session *scheduledSession) { for { select { case <-session.ticker.C: - s.processSession(session.ctx, session.session) - case <-session.ctx.Done(): + if time.Now().After(session.session.ExpiresAt) { + // Session has expired. Need to close the session. + s.pwd.SessionClose(session.session) + return + } else { + s.processSession(ctx, session.session) + } + case <-ctx.Done(): return } } @@ -115,7 +152,7 @@ func (s *scheduler) processSession(ctx context.Context, session *types.Session) func (s *scheduler) processInstance(ctx context.Context, instance *types.Instance) { for _, task := range s.tasks { - task.Run(ctx, instance) + go task.Run(ctx, instance) } } @@ -126,7 +163,10 @@ func (s *scheduler) Schedule(session *types.Session) error { if _, found := s.scheduledSessions[session.Id]; found { return fmt.Errorf("Session [%s] was already scheduled", session.Id) } - go s.cron(s.register(session)) + scheduledSession := s.register(session) + ctx, cancel := context.WithCancel(context.Background()) + scheduledSession.cancel = cancel + go s.cron(ctx, scheduledSession) return nil } diff --git a/task/scheduler_test.go b/scheduler/scheduler_test.go similarity index 53% rename from task/scheduler_test.go rename to scheduler/scheduler_test.go index 150599e..df14f7f 100644 --- a/task/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -1,4 +1,4 @@ -package task +package scheduler import ( "context" @@ -7,7 +7,10 @@ import ( "os" "sync" "testing" + "time" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd" "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/play-with-docker/play-with-docker/storage" "github.com/stretchr/testify/assert" @@ -40,7 +43,8 @@ func TestNew(t *testing.T) { store := mockStorage() s := &types.Session{ - Id: "aaabbbccc", + Id: "aaabbbccc", + ExpiresAt: time.Now().Add(time.Hour), Instances: map[string]*types.Instance{ "node1": &types.Instance{ Name: "node1", @@ -51,14 +55,14 @@ func TestNew(t *testing.T) { err := store.SessionPut(s) assert.Nil(t, err) - sch, err := NewScheduler(store) + sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{}) assert.Nil(t, err) assert.Len(t, sch.scheduledSessions, 1) } func TestAddTask(t *testing.T) { store := mockStorage() - sch, err := NewScheduler(store) + sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{}) assert.Nil(t, err) task := &mockTask{name: "FooBar"} @@ -73,7 +77,7 @@ func TestAddTask(t *testing.T) { func TestRemoveTask(t *testing.T) { store := mockStorage() - sch, err := NewScheduler(store) + sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{}) assert.Nil(t, err) task := &mockTask{name: "FooBar"} @@ -93,7 +97,8 @@ func TestStart(t *testing.T) { store := mockStorage() s := &types.Session{ - Id: "aaabbbccc", + Id: "aaabbbccc", + ExpiresAt: time.Now().Add(time.Hour), Instances: map[string]*types.Instance{ "node1": &types.Instance{ Name: "node1", @@ -104,7 +109,7 @@ func TestStart(t *testing.T) { err := store.SessionPut(s) assert.Nil(t, err) - sch, err := NewScheduler(store) + sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{}) assert.Nil(t, err) wg := sync.WaitGroup{} @@ -119,6 +124,78 @@ func TestStart(t *testing.T) { assert.Nil(t, err) sch.Start() + defer sch.Stop() wg.Wait() assert.True(t, ran) } + +func TestScheduleFromEvent(t *testing.T) { + s := &types.Session{ + Id: "aaaabbbbcccc", + ExpiresAt: time.Now().Add(time.Hour), + } + lb := event.NewLocalBroker() + store := mockStorage() + store.SessionPut(s) + sch, err := NewScheduler(store, lb, &pwd.Mock{}) + assert.Nil(t, err) + + sch.Start() + defer sch.Stop() + + lb.Emit(event.SESSION_NEW, s.Id) + + time.Sleep(time.Second) + + assert.Len(t, sch.scheduledSessions, 1) +} + +func TestUnscheduleFromEvent(t *testing.T) { + s := &types.Session{ + Id: "aaaabbbbcccc", + ExpiresAt: time.Now().Add(time.Hour), + } + lb := event.NewLocalBroker() + store := mockStorage() + store.SessionPut(s) + sch, err := NewScheduler(store, lb, &pwd.Mock{}) + assert.Nil(t, err) + + sch.Start() + defer sch.Stop() + + lb.Emit(event.SESSION_END, s.Id) + + time.Sleep(time.Second) + + assert.Len(t, sch.scheduledSessions, 0) +} + +func TestCloseSession(t *testing.T) { + _e := event.NewLocalBroker() + _p := &pwd.Mock{} + _s := mockStorage() + + s := &types.Session{ + Id: "aaaabbbbcccc", + ExpiresAt: time.Now().Add(-1 * time.Hour), + } + _p.On("SessionClose", s).Return(nil) + + _s.SessionPut(s) + + sch, err := NewScheduler(_s, _e, _p) + assert.Nil(t, err) + + sch.Start() + defer sch.Stop() + + time.Sleep(2 * time.Second) + + _p.AssertExpectations(t) + + _e.Emit(event.SESSION_END, s.Id) + time.Sleep(time.Second) + + assert.Len(t, sch.scheduledSessions, 0) +} diff --git a/scheduler/task/check_ports.go b/scheduler/task/check_ports.go new file mode 100644 index 0000000..29b66cd --- /dev/null +++ b/scheduler/task/check_ports.go @@ -0,0 +1,55 @@ +package task + +import ( + "context" + "log" + + "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd/types" +) + +type DockerPorts struct { + Instance string `json:"instance"` + Ports []int `json:"ports"` +} + +type checkPorts struct { + event event.EventApi + factory docker.FactoryApi +} + +var CheckPortsEvent event.EventType + +func init() { + CheckPortsEvent = event.NewEventType("instance docker ports") +} + +func (t *checkPorts) Name() string { + return "CheckPorts" +} + +func (t *checkPorts) Run(ctx context.Context, instance *types.Instance) error { + dockerClient, err := t.factory.GetForInstance(instance.SessionId, instance.Name) + if err != nil { + log.Println(err) + return err + } + + ps, err := dockerClient.GetPorts() + if err != nil { + log.Println(err) + return err + } + ports := make([]int, len(ps)) + for i, port := range ps { + ports[i] = int(port) + } + + t.event.Emit(CheckPortsEvent, instance.SessionId, DockerPorts{Instance: instance.Name, Ports: ports}) + return nil +} + +func NewCheckPorts(e event.EventApi, f docker.FactoryApi) *checkPorts { + return &checkPorts{event: e, factory: f} +} diff --git a/scheduler/task/check_ports_test.go b/scheduler/task/check_ports_test.go new file mode 100644 index 0000000..66ed436 --- /dev/null +++ b/scheduler/task/check_ports_test.go @@ -0,0 +1,48 @@ +package task + +import ( + "context" + "testing" + + "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/stretchr/testify/assert" +) + +func TestCheckPorts_Name(t *testing.T) { + e := &event.Mock{} + f := &docker.FactoryMock{} + + task := NewCheckPorts(e, f) + + assert.Equal(t, "CheckPorts", task.Name()) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} + +func TestCheckPorts_Run(t *testing.T) { + d := &docker.Mock{} + e := &event.Mock{} + f := &docker.FactoryMock{} + + d.On("GetPorts").Return([]uint16{8080, 9090}, nil) + f.On("GetForInstance", "aaaabbbbcccc", "aaaabbbb_node1").Return(d, nil) + e.M.On("Emit", CheckPortsEvent, "aaaabbbbcccc", []interface{}{DockerPorts{Instance: "aaaabbbb_node1", Ports: []int{8080, 9090}}}).Return() + + i := &types.Instance{ + IP: "10.0.0.1", + Name: "aaaabbbb_node1", + SessionId: "aaaabbbbcccc", + } + + task := NewCheckPorts(e, f) + ctx := context.Background() + + err := task.Run(ctx, i) + + assert.Nil(t, err) + d.AssertExpectations(t) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} diff --git a/scheduler/task/check_swarm_ports.go b/scheduler/task/check_swarm_ports.go new file mode 100644 index 0000000..3357148 --- /dev/null +++ b/scheduler/task/check_swarm_ports.go @@ -0,0 +1,72 @@ +package task + +import ( + "context" + "fmt" + "log" + + "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd/types" +) + +type DockerSwarmPorts struct { + Manager string `json:"manager"` + Instances []string `json:"instances"` + Ports []int `json:"ports"` +} + +type checkSwarmPorts struct { + event event.EventApi + factory docker.FactoryApi +} + +var CheckSwarmPortsEvent event.EventType + +func init() { + CheckSwarmPortsEvent = event.NewEventType("instance docker swarm ports") +} + +func (t *checkSwarmPorts) Name() string { + return "CheckSwarmPorts" +} + +func (t *checkSwarmPorts) Run(ctx context.Context, instance *types.Instance) error { + dockerClient, err := t.factory.GetForInstance(instance.SessionId, instance.Name) + if err != nil { + log.Println(err) + return err + } + + status, err := getDockerSwarmStatus(ctx, dockerClient) + if err != nil { + log.Println(err) + return err + } + + if !status.IsManager { + return nil + } + + hosts, ps, err := dockerClient.GetSwarmPorts() + if err != nil { + log.Println(err) + return err + } + instances := make([]string, len(hosts)) + sessionPrefix := instance.SessionId[:8] + for i, host := range hosts { + instances[i] = fmt.Sprintf("%s_%s", sessionPrefix, host) + } + ports := make([]int, len(ps)) + for i, port := range ps { + ports[i] = int(port) + } + + t.event.Emit(CheckSwarmPortsEvent, instance.SessionId, DockerSwarmPorts{Manager: instance.Name, Instances: instances, Ports: ports}) + return nil +} + +func NewCheckSwarmPorts(e event.EventApi, f docker.FactoryApi) *checkSwarmPorts { + return &checkSwarmPorts{event: e, factory: f} +} diff --git a/scheduler/task/check_swarm_ports_test.go b/scheduler/task/check_swarm_ports_test.go new file mode 100644 index 0000000..8e5afc6 --- /dev/null +++ b/scheduler/task/check_swarm_ports_test.go @@ -0,0 +1,57 @@ +package task + +import ( + "context" + "testing" + + dockerTypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/swarm" + "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/stretchr/testify/assert" +) + +func TestCheckSwarmPorts_Name(t *testing.T) { + e := &event.Mock{} + f := &docker.FactoryMock{} + + task := NewCheckSwarmPorts(e, f) + + assert.Equal(t, "CheckSwarmPorts", task.Name()) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} + +func TestCheckSwarmPorts_RunWhenManager(t *testing.T) { + d := &docker.Mock{} + e := &event.Mock{} + f := &docker.FactoryMock{} + + i := &types.Instance{ + IP: "10.0.0.1", + Name: "aaaabbbb_node1", + SessionId: "aaaabbbbcccc", + } + info := dockerTypes.Info{ + Swarm: swarm.Info{ + LocalNodeState: swarm.LocalNodeStateActive, + ControlAvailable: true, + }, + } + + f.On("GetForInstance", "aaaabbbbcccc", "aaaabbbb_node1").Return(d, nil) + d.On("GetDaemonInfo").Return(info, nil) + d.On("GetSwarmPorts").Return([]string{"node1", "node2"}, []uint16{8080, 9090}, nil) + e.M.On("Emit", CheckSwarmPortsEvent, "aaaabbbbcccc", []interface{}{DockerSwarmPorts{Manager: i.Name, Instances: []string{i.Name, "aaaabbbb_node2"}, Ports: []int{8080, 9090}}}).Return() + + task := NewCheckSwarmPorts(e, f) + ctx := context.Background() + + err := task.Run(ctx, i) + + assert.Nil(t, err) + d.AssertExpectations(t) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} diff --git a/scheduler/task/check_swarm_status.go b/scheduler/task/check_swarm_status.go new file mode 100644 index 0000000..8e02402 --- /dev/null +++ b/scheduler/task/check_swarm_status.go @@ -0,0 +1,69 @@ +package task + +import ( + "context" + "log" + + "github.com/docker/docker/api/types/swarm" + "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd/types" +) + +type DockerSwarmStatus struct { + IsManager bool `json:"is_manager"` + IsWorker bool `json:"is_worker"` + Instance string `json:"instance"` +} + +type checkSwarmStatus struct { + event event.EventApi + factory docker.FactoryApi +} + +var CheckSwarmStatusEvent event.EventType + +func init() { + CheckSwarmStatusEvent = event.NewEventType("instance docker swarm status") +} + +func (t *checkSwarmStatus) Name() string { + return "CheckSwarmStatus" +} + +func (t *checkSwarmStatus) Run(ctx context.Context, instance *types.Instance) error { + dockerClient, err := t.factory.GetForInstance(instance.SessionId, instance.Name) + if err != nil { + log.Println(err) + return err + } + + status, err := getDockerSwarmStatus(ctx, dockerClient) + if err != nil { + log.Println(err) + return err + } + status.Instance = instance.Name + + t.event.Emit(CheckSwarmStatusEvent, instance.SessionId, status) + return nil +} + +func NewCheckSwarmStatus(e event.EventApi, f docker.FactoryApi) *checkSwarmStatus { + return &checkSwarmStatus{event: e, factory: f} +} + +func getDockerSwarmStatus(ctx context.Context, client docker.DockerApi) (DockerSwarmStatus, error) { + status := DockerSwarmStatus{} + info, err := client.GetDaemonInfo() + if err != nil { + return status, err + } + + if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked { + status.IsManager = info.Swarm.ControlAvailable + status.IsWorker = !info.Swarm.ControlAvailable + } + + return status, nil +} diff --git a/scheduler/task/check_swarm_status_test.go b/scheduler/task/check_swarm_status_test.go new file mode 100644 index 0000000..1bafe25 --- /dev/null +++ b/scheduler/task/check_swarm_status_test.go @@ -0,0 +1,150 @@ +package task + +import ( + "context" + "testing" + + dockerTypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/swarm" + "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/stretchr/testify/assert" +) + +func TestCheckSwarmStatus_Name(t *testing.T) { + e := &event.Mock{} + f := &docker.FactoryMock{} + + task := NewCheckSwarmStatus(e, f) + + assert.Equal(t, "CheckSwarmStatus", task.Name()) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} + +func TestCheckSwarmStatus_RunWhenInactive(t *testing.T) { + d := &docker.Mock{} + e := &event.Mock{} + f := &docker.FactoryMock{} + + i := &types.Instance{ + IP: "10.0.0.1", + Name: "node1", + SessionId: "aaabbbccc", + } + infoInactive := dockerTypes.Info{ + Swarm: swarm.Info{ + LocalNodeState: swarm.LocalNodeStateInactive, + }, + } + + f.On("GetForInstance", "aaabbbccc", "node1").Return(d, nil) + d.On("GetDaemonInfo").Return(infoInactive, nil) + e.M.On("Emit", CheckSwarmStatusEvent, "aaabbbccc", []interface{}{DockerSwarmStatus{IsManager: false, IsWorker: false, Instance: "node1"}}).Return() + + task := NewCheckSwarmStatus(e, f) + ctx := context.Background() + + err := task.Run(ctx, i) + + assert.Nil(t, err) + d.AssertExpectations(t) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} + +func TestCheckSwarmStatus_RunWhenLocked(t *testing.T) { + d := &docker.Mock{} + e := &event.Mock{} + f := &docker.FactoryMock{} + + i := &types.Instance{ + IP: "10.0.0.1", + Name: "node1", + SessionId: "aaabbbccc", + } + infoLocked := dockerTypes.Info{ + Swarm: swarm.Info{ + LocalNodeState: swarm.LocalNodeStateLocked, + }, + } + + f.On("GetForInstance", "aaabbbccc", "node1").Return(d, nil) + d.On("GetDaemonInfo").Return(infoLocked, nil) + e.M.On("Emit", CheckSwarmStatusEvent, "aaabbbccc", []interface{}{DockerSwarmStatus{IsManager: false, IsWorker: false, Instance: "node1"}}).Return() + + task := NewCheckSwarmStatus(e, f) + ctx := context.Background() + + err := task.Run(ctx, i) + + assert.Nil(t, err) + d.AssertExpectations(t) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} + +func TestCheckSwarmStatus_RunWhenManager(t *testing.T) { + d := &docker.Mock{} + e := &event.Mock{} + f := &docker.FactoryMock{} + + i := &types.Instance{ + IP: "10.0.0.1", + Name: "node1", + SessionId: "aaabbbccc", + } + infoLocked := dockerTypes.Info{ + Swarm: swarm.Info{ + LocalNodeState: swarm.LocalNodeStateActive, + ControlAvailable: true, + }, + } + + f.On("GetForInstance", "aaabbbccc", "node1").Return(d, nil) + d.On("GetDaemonInfo").Return(infoLocked, nil) + e.M.On("Emit", CheckSwarmStatusEvent, "aaabbbccc", []interface{}{DockerSwarmStatus{IsManager: true, IsWorker: false, Instance: "node1"}}).Return() + + task := NewCheckSwarmStatus(e, f) + ctx := context.Background() + + err := task.Run(ctx, i) + + assert.Nil(t, err) + d.AssertExpectations(t) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} + +func TestCheckSwarmStatus_RunWhenWorker(t *testing.T) { + d := &docker.Mock{} + e := &event.Mock{} + f := &docker.FactoryMock{} + + i := &types.Instance{ + IP: "10.0.0.1", + Name: "node1", + SessionId: "aaabbbccc", + } + infoLocked := dockerTypes.Info{ + Swarm: swarm.Info{ + LocalNodeState: swarm.LocalNodeStateActive, + ControlAvailable: false, + }, + } + + f.On("GetForInstance", "aaabbbccc", "node1").Return(d, nil) + d.On("GetDaemonInfo").Return(infoLocked, nil) + e.M.On("Emit", CheckSwarmStatusEvent, "aaabbbccc", []interface{}{DockerSwarmStatus{IsManager: false, IsWorker: true, Instance: "node1"}}).Return() + + task := NewCheckSwarmStatus(e, f) + ctx := context.Background() + + err := task.Run(ctx, i) + + assert.Nil(t, err) + d.AssertExpectations(t) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} diff --git a/scheduler/task/collect_stats.go b/scheduler/task/collect_stats.go new file mode 100644 index 0000000..30392e4 --- /dev/null +++ b/scheduler/task/collect_stats.go @@ -0,0 +1,93 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + "log" + + dockerTypes "github.com/docker/docker/api/types" + units "github.com/docker/go-units" + "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd/types" +) + +type InstanceStats struct { + Instance string `json:"instance"` + Mem string `json:"mem"` + Cpu string `json:"cpu"` +} + +type collectStats struct { + event event.EventApi + factory docker.FactoryApi +} + +var CollectStatsEvent event.EventType + +func init() { + CollectStatsEvent = event.NewEventType("instance stats") +} + +func (t *collectStats) Name() string { + return "CollectStats" +} + +func (t *collectStats) Run(ctx context.Context, instance *types.Instance) error { + dockerClient, err := t.factory.GetForSession(instance.SessionId) + if err != nil { + log.Println(err) + return err + } + reader, err := dockerClient.GetContainerStats(instance.Name) + if err != nil { + log.Println("Error while trying to collect instance stats", err) + return err + } + dec := json.NewDecoder(reader) + var v *dockerTypes.StatsJSON + e := dec.Decode(&v) + if e != nil { + log.Println("Error while trying to collect instance stats", e) + return err + } + stats := InstanceStats{Instance: instance.Name} + // Memory + var memPercent float64 = 0 + if v.MemoryStats.Limit != 0 { + memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 + } + mem := float64(v.MemoryStats.Usage) + memLimit := float64(v.MemoryStats.Limit) + + stats.Mem = fmt.Sprintf("%.2f%% (%s / %s)", memPercent, units.BytesSize(mem), units.BytesSize(memLimit)) + + // cpu + previousCPU := v.PreCPUStats.CPUUsage.TotalUsage + previousSystem := v.PreCPUStats.SystemUsage + cpuPercent := calculateCPUPercentUnix(previousCPU, previousSystem, v) + stats.Cpu = fmt.Sprintf("%.2f%%", cpuPercent) + + t.event.Emit(CollectStatsEvent, instance.SessionId, stats) + return nil +} + +func NewCollectStats(e event.EventApi, f docker.FactoryApi) *collectStats { + return &collectStats{event: e, factory: f} +} + +func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *dockerTypes.StatsJSON) float64 { + var ( + cpuPercent = 0.0 + // calculate the change for the cpu usage of the container in between readings + cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU) + // calculate the change for the entire system between readings + systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem) + ) + + if systemDelta > 0.0 && cpuDelta > 0.0 { + cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0 + } + return cpuPercent +} diff --git a/scheduler/task/collect_stats_test.go b/scheduler/task/collect_stats_test.go new file mode 100644 index 0000000..ccbd188 --- /dev/null +++ b/scheduler/task/collect_stats_test.go @@ -0,0 +1,71 @@ +package task + +import ( + "bytes" + "context" + "encoding/json" + "io" + "testing" + + dockerTypes "github.com/docker/docker/api/types" + "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +type mockSessionProvider struct { + mock.Mock +} + +func (m *mockSessionProvider) GetDocker(sessionId string) (docker.DockerApi, error) { + args := m.Called(sessionId) + + return args.Get(0).(docker.DockerApi), args.Error(1) +} + +type nopCloser struct { + io.Reader +} + +func (nopCloser) Close() error { return nil } + +func TestCollectStats_Name(t *testing.T) { + e := &event.Mock{} + f := &docker.FactoryMock{} + + task := NewCollectStats(e, f) + + assert.Equal(t, "CollectStats", task.Name()) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} + +func TestCollectStats_Run(t *testing.T) { + d := &docker.Mock{} + e := &event.Mock{} + f := &docker.FactoryMock{} + + stats := dockerTypes.StatsJSON{} + b, _ := json.Marshal(stats) + i := &types.Instance{ + IP: "10.0.0.1", + Name: "aaaabbbb_node1", + SessionId: "aaaabbbbcccc", + } + + f.On("GetForSession", i.SessionId).Return(d, nil) + d.On("GetContainerStats", i.Name).Return(nopCloser{bytes.NewReader(b)}, nil) + e.M.On("Emit", CollectStatsEvent, "aaaabbbbcccc", []interface{}{InstanceStats{Instance: i.Name, Mem: "0.00% (0B / 0B)", Cpu: "0.00%"}}).Return() + + task := NewCollectStats(e, f) + ctx := context.Background() + + err := task.Run(ctx, i) + + assert.Nil(t, err) + d.AssertExpectations(t) + e.M.AssertExpectations(t) + f.AssertExpectations(t) +} diff --git a/storage/file.go b/storage/file.go index 1925c29..9c91f44 100644 --- a/storage/file.go +++ b/storage/file.go @@ -48,27 +48,12 @@ func (store *storage) SessionPut(s *types.Session) error { return store.save() } -func (store *storage) InstanceFindByIP(ip string) (*types.Instance, error) { - store.rw.Lock() - defer store.rw.Unlock() - - for _, s := range store.db { - for _, i := range s.Instances { - if i.IP == ip { - return i, nil - } - } - } - - return nil, fmt.Errorf("%s", notFound) -} - -func (store *storage) InstanceFindByIPAndSession(sessionPrefix, ip string) (*types.Instance, error) { +func (store *storage) InstanceFind(sessionId, ip string) (*types.Instance, error) { store.rw.Lock() defer store.rw.Unlock() for id, s := range store.db { - if strings.HasPrefix(id, sessionPrefix) { + if strings.HasPrefix(id, sessionId[:8]) { for _, i := range s.Instances { if i.IP == ip { return i, nil @@ -80,23 +65,6 @@ func (store *storage) InstanceFindByIPAndSession(sessionPrefix, ip string) (*typ return nil, fmt.Errorf("%s", notFound) } -func (store *storage) InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error) { - store.rw.Lock() - defer store.rw.Unlock() - - for id, s := range store.db { - if strings.HasPrefix(id, sessionPrefix) { - for _, i := range s.Instances { - if i.Alias == alias { - return i, nil - } - } - } - } - - return nil, fmt.Errorf("%s", notFound) -} - func (store *storage) InstanceCreate(sessionId string, instance *types.Instance) error { store.rw.Lock() defer store.rw.Unlock() diff --git a/storage/file_test.go b/storage/file_test.go index 71d711f..65a1a93 100644 --- a/storage/file_test.go +++ b/storage/file_test.go @@ -102,7 +102,7 @@ func TestSessionGetAll(t *testing.T) { assert.Equal(t, s2, loadedSessions[s2.Id]) } -func TestInstanceFindByIP(t *testing.T) { +func TestInstanceFind(t *testing.T) { tmpfile, err := ioutil.TempFile("", "pwd") if err != nil { log.Fatal(err) @@ -124,97 +124,23 @@ func TestInstanceFindByIP(t *testing.T) { err = storage.SessionPut(s2) assert.Nil(t, err) - foundInstance, err := storage.InstanceFindByIP("10.0.0.1") + foundInstance, err := storage.InstanceFind("session1", "10.0.0.1") assert.Nil(t, err) assert.Equal(t, i1, foundInstance) - foundInstance, err = storage.InstanceFindByIP("10.1.0.1") + foundInstance, err = storage.InstanceFind("session2", "10.1.0.1") assert.Nil(t, err) assert.Equal(t, i2, foundInstance) - foundInstance, err = storage.InstanceFindByIP("192.168.0.1") - assert.True(t, NotFound(err)) - assert.Nil(t, foundInstance) -} - -func TestInstanceFindByIPAndSession(t *testing.T) { - tmpfile, err := ioutil.TempFile("", "pwd") - if err != nil { - log.Fatal(err) - } - tmpfile.Close() - os.Remove(tmpfile.Name()) - defer os.Remove(tmpfile.Name()) - - storage, err := NewFileStorage(tmpfile.Name()) - - assert.Nil(t, err) - - i1 := &types.Instance{Name: "i1", IP: "10.0.0.1"} - i2 := &types.Instance{Name: "i2", IP: "10.1.0.1"} - s1 := &types.Session{Id: "session1", Instances: map[string]*types.Instance{"i1": i1}} - s2 := &types.Session{Id: "session2", Instances: map[string]*types.Instance{"i2": i2}} - err = storage.SessionPut(s1) - assert.Nil(t, err) - err = storage.SessionPut(s2) - assert.Nil(t, err) - - foundInstance, err := storage.InstanceFindByIPAndSession("session1", "10.0.0.1") - assert.Nil(t, err) - assert.Equal(t, i1, foundInstance) - - foundInstance, err = storage.InstanceFindByIPAndSession("session2", "10.1.0.1") - assert.Nil(t, err) - assert.Equal(t, i2, foundInstance) - - foundInstance, err = storage.InstanceFindByIPAndSession("session3", "10.1.0.1") + foundInstance, err = storage.InstanceFind("session3", "10.1.0.1") assert.True(t, NotFound(err)) assert.Nil(t, foundInstance) - foundInstance, err = storage.InstanceFindByIPAndSession("session1", "10.1.0.1") + foundInstance, err = storage.InstanceFind("session1", "10.1.0.1") assert.True(t, NotFound(err)) assert.Nil(t, foundInstance) - foundInstance, err = storage.InstanceFindByIPAndSession("session1", "192.168.0.1") - assert.True(t, NotFound(err)) - assert.Nil(t, foundInstance) -} - -func TestInstanceFindByAlias(t *testing.T) { - tmpfile, err := ioutil.TempFile("", "pwd") - if err != nil { - log.Fatal(err) - } - tmpfile.Close() - os.Remove(tmpfile.Name()) - defer os.Remove(tmpfile.Name()) - - storage, err := NewFileStorage(tmpfile.Name()) - - assert.Nil(t, err) - - i1 := &types.Instance{Name: "i1", Alias: "foo", IP: "10.0.0.1"} - i2 := &types.Instance{Name: "i2", Alias: "foo", IP: "10.1.0.1"} - s1 := &types.Session{Id: "session1", Instances: map[string]*types.Instance{"i1": i1}} - s2 := &types.Session{Id: "session2", Instances: map[string]*types.Instance{"i2": i2}} - err = storage.SessionPut(s1) - assert.Nil(t, err) - err = storage.SessionPut(s2) - assert.Nil(t, err) - - foundInstance, err := storage.InstanceFindByAlias("session1", "foo") - assert.Nil(t, err) - assert.Equal(t, i1, foundInstance) - - foundInstance, err = storage.InstanceFindByAlias("session2", "foo") - assert.Nil(t, err) - assert.Equal(t, i2, foundInstance) - - foundInstance, err = storage.InstanceFindByAlias("session1", "bar") - assert.True(t, NotFound(err)) - assert.Nil(t, foundInstance) - - foundInstance, err = storage.InstanceFindByAlias("session3", "foo") + foundInstance, err = storage.InstanceFind("session1", "192.168.0.1") assert.True(t, NotFound(err)) assert.Nil(t, foundInstance) } @@ -232,7 +158,7 @@ func TestInstanceCreate(t *testing.T) { assert.Nil(t, err) - i1 := &types.Instance{Name: "i1", Alias: "foo", IP: "10.0.0.1"} + i1 := &types.Instance{Name: "i1", IP: "10.0.0.1"} s1 := &types.Session{Id: "session1"} err = storage.SessionPut(s1) assert.Nil(t, err) @@ -260,8 +186,8 @@ func TestCounts(t *testing.T) { assert.Nil(t, err) c1 := &types.Client{} - i1 := &types.Instance{Name: "i1", Alias: "foo", IP: "10.0.0.1"} - i2 := &types.Instance{Name: "i2", Alias: "foo", IP: "10.1.0.1"} + i1 := &types.Instance{Name: "i1", IP: "10.0.0.1"} + i2 := &types.Instance{Name: "i2", IP: "10.1.0.1"} s1 := &types.Session{Id: "session1", Instances: map[string]*types.Instance{"i1": i1}} s2 := &types.Session{Id: "session2", Instances: map[string]*types.Instance{"i2": i2}} s3 := &types.Session{Id: "session3", Clients: []*types.Client{c1}} diff --git a/storage/mock.go b/storage/mock.go new file mode 100644 index 0000000..3398ece --- /dev/null +++ b/storage/mock.go @@ -0,0 +1,55 @@ +package storage + +import ( + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/stretchr/testify/mock" +) + +type Mock struct { + mock.Mock +} + +func (m *Mock) SessionGet(sessionId string) (*types.Session, error) { + args := m.Called(sessionId) + return args.Get(0).(*types.Session), args.Error(1) +} + +func (m *Mock) SessionPut(session *types.Session) error { + args := m.Called(session) + return args.Error(0) +} + +func (m *Mock) SessionCount() (int, error) { + args := m.Called() + return args.Int(0), args.Error(1) +} + +func (m *Mock) SessionDelete(sessionId string) error { + args := m.Called(sessionId) + return args.Error(0) +} + +func (m *Mock) SessionGetAll() (map[string]*types.Session, error) { + args := m.Called() + return args.Get(0).(map[string]*types.Session), args.Error(1) +} + +func (m *Mock) InstanceFind(sessionId, ip string) (*types.Instance, error) { + args := m.Called(sessionId, ip) + return args.Get(0).(*types.Instance), args.Error(1) +} + +func (m *Mock) InstanceCreate(sessionId string, instance *types.Instance) error { + args := m.Called(sessionId, instance) + return args.Error(0) +} + +func (m *Mock) InstanceDelete(sessionId, instanceName string) error { + args := m.Called(sessionId, instanceName) + return args.Error(0) +} + +func (m *Mock) InstanceCount() (int, error) { + args := m.Called() + return args.Int(0), args.Error(1) +} diff --git a/storage/storage.go b/storage/storage.go index 39f3fc6..5720acc 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -15,10 +15,7 @@ type StorageApi interface { SessionDelete(string) error SessionGetAll() (map[string]*types.Session, error) - InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error) - // Should have the session id too, soon - InstanceFindByIP(ip string) (*types.Instance, error) - InstanceFindByIPAndSession(sessionPrefix, ip string) (*types.Instance, error) + InstanceFind(session, ip string) (*types.Instance, error) InstanceCreate(sessionId string, instance *types.Instance) error InstanceDelete(sessionId, instanceName string) error