diff --git a/handlers/bootstrap.go b/handlers/bootstrap.go index cb8252f..9e8e90b 100644 --- a/handlers/bootstrap.go +++ b/handlers/bootstrap.go @@ -3,11 +3,12 @@ package handlers import ( "log" "os" - "time" "github.com/docker/docker/client" + "github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/pwd" + "github.com/play-with-docker/play-with-docker/storage" ) var core pwd.PWDApi @@ -28,16 +29,11 @@ func Bootstrap() { t := pwd.NewScheduler(Broadcast, d) - s := pwd.NewStorage() - - core = pwd.NewPWD(d, t, Broadcast, s) - - loadStart := time.Now() - err = core.SessionLoadAndPrepare() - loadElapsed := time.Since(loadStart) - log.Printf("***************** Loading stored sessions took %s\n", loadElapsed) + s, err := storage.NewFileStorage(config.SessionsFile) if err != nil && !os.IsNotExist(err) { log.Fatal("Error decoding sessions from disk ", err) } + core = pwd.NewPWD(d, t, Broadcast, s) + } diff --git a/pwd/check_swarm_status_task.go b/pwd/check_swarm_status_task.go index 3019a27..8f0ee94 100644 --- a/pwd/check_swarm_status_task.go +++ b/pwd/check_swarm_status_task.go @@ -4,16 +4,17 @@ import ( "log" "github.com/docker/docker/api/types/swarm" + "github.com/play-with-docker/play-with-docker/pwd/types" ) type checkSwarmStatusTask struct { } -func (c checkSwarmStatusTask) Run(i *Instance) error { - if i.docker == nil { +func (c checkSwarmStatusTask) Run(i *types.Instance) error { + if i.Docker == nil { return nil } - if info, err := i.docker.GetDaemonInfo(); err == nil { + if info, err := i.Docker.GetDaemonInfo(); err == nil { if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked { i.IsManager = &info.Swarm.ControlAvailable } else { diff --git a/pwd/check_swarm_used_ports.go b/pwd/check_swarm_used_ports.go index 7157240..a62dd05 100644 --- a/pwd/check_swarm_used_ports.go +++ b/pwd/check_swarm_used_ports.go @@ -3,27 +3,29 @@ package pwd import ( "fmt" "log" + + "github.com/play-with-docker/play-with-docker/pwd/types" ) type checkSwarmUsedPortsTask struct { } -func (c checkSwarmUsedPortsTask) Run(i *Instance) error { - if i.docker == nil { +func (c checkSwarmUsedPortsTask) Run(i *types.Instance) error { + if i.Docker == nil { return nil } if i.IsManager != nil && *i.IsManager { - sessionPrefix := i.session.Id[:8] + sessionPrefix := i.Session.Id[:8] // This is a swarm manager instance, then check for ports - if hosts, ports, err := i.docker.GetSwarmPorts(); err != nil { + if hosts, ports, err := i.Docker.GetSwarmPorts(); err != nil { log.Println(err) return err } else { for _, host := range hosts { host = fmt.Sprintf("%s_%s", sessionPrefix, host) for _, port := range ports { - if i.session.Instances[host] != nil { - i.session.Instances[host].setUsedPort(port) + if i.Session.Instances[host] != nil { + i.Session.Instances[host].SetUsedPort(port) } } } diff --git a/pwd/check_used_ports_task.go b/pwd/check_used_ports_task.go index 74e3f3b..83ffb89 100644 --- a/pwd/check_used_ports_task.go +++ b/pwd/check_used_ports_task.go @@ -1,17 +1,21 @@ package pwd -import "log" +import ( + "log" + + "github.com/play-with-docker/play-with-docker/pwd/types" +) type checkUsedPortsTask struct { } -func (c checkUsedPortsTask) Run(i *Instance) error { - if i.docker == nil { +func (c checkUsedPortsTask) Run(i *types.Instance) error { + if i.Docker == nil { return nil } - if ports, err := i.docker.GetPorts(); err == nil { + if ports, err := i.Docker.GetPorts(); err == nil { for _, p := range ports { - i.setUsedPort(uint16(p)) + i.SetUsedPort(uint16(p)) } } else { log.Println(err) diff --git a/pwd/client.go b/pwd/client.go index f50d758..6fc1021 100644 --- a/pwd/client.go +++ b/pwd/client.go @@ -3,51 +3,42 @@ package pwd import ( "log" "time" + + "github.com/play-with-docker/play-with-docker/pwd/types" ) -type Client struct { - Id string - viewPort ViewPort - session *Session -} - -type ViewPort struct { - Rows uint - Cols uint -} - -func (p *pwd) ClientNew(id string, session *Session) *Client { +func (p *pwd) ClientNew(id string, session *types.Session) *types.Client { defer observeAction("ClientNew", time.Now()) - c := &Client{Id: id, session: session} - session.clients = append(session.clients, c) + c := &types.Client{Id: id, Session: session} + session.Clients = append(session.Clients, c) return c } -func (p *pwd) ClientResizeViewPort(c *Client, cols, rows uint) { +func (p *pwd) ClientResizeViewPort(c *types.Client, cols, rows uint) { defer observeAction("ClientResizeViewPort", time.Now()) - c.viewPort.Rows = rows - c.viewPort.Cols = cols + c.ViewPort.Rows = rows + c.ViewPort.Cols = cols - p.notifyClientSmallestViewPort(c.session) + p.notifyClientSmallestViewPort(c.Session) } -func (p *pwd) ClientClose(client *Client) { +func (p *pwd) ClientClose(client *types.Client) { defer observeAction("ClientClose", time.Now()) // Client has disconnected. Remove from session and recheck terminal sizes. - session := client.session - for i, cl := range session.clients { + session := client.Session + for i, cl := range session.Clients { if cl.Id == client.Id { - session.clients = append(session.clients[:i], session.clients[i+1:]...) + session.Clients = append(session.Clients[:i], session.Clients[i+1:]...) break } } - if len(session.clients) > 0 { + if len(session.Clients) > 0 { p.notifyClientSmallestViewPort(session) } - setGauges() + p.setGauges() } -func (p *pwd) notifyClientSmallestViewPort(session *Session) { +func (p *pwd) notifyClientSmallestViewPort(session *types.Session) { vp := p.SessionGetSmallestViewPort(session) // Resize all terminals in the session p.broadcast.BroadcastTo(session.Id, "viewport resize", vp.Cols, vp.Rows) diff --git a/pwd/client_test.go b/pwd/client_test.go index 4aed04a..7ab9243 100644 --- a/pwd/client_test.go +++ b/pwd/client_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/stretchr/testify/assert" ) @@ -20,8 +21,8 @@ func TestClientNew(t *testing.T) { client := p.ClientNew("foobar", session) - assert.Equal(t, Client{Id: "foobar", session: session, viewPort: ViewPort{Cols: 0, Rows: 0}}, *client) - assert.Contains(t, session.clients, client) + assert.Equal(t, types.Client{Id: "foobar", Session: session, ViewPort: types.ViewPort{Cols: 0, Rows: 0}}, *client) + assert.Contains(t, session.Clients, client) } func TestClientResizeViewPort(t *testing.T) { @@ -49,7 +50,7 @@ func TestClientResizeViewPort(t *testing.T) { p.ClientResizeViewPort(client, 80, 24) - assert.Equal(t, ViewPort{Cols: 80, Rows: 24}, client.viewPort) + assert.Equal(t, types.ViewPort{Cols: 80, Rows: 24}, client.ViewPort) assert.Equal(t, session.Id, broadcastedSessionId) assert.Equal(t, "viewport resize", broadcastedEventName) assert.Equal(t, uint(80), broadcastedArgs[0]) diff --git a/pwd/collect_stats_task.go b/pwd/collect_stats_task.go index 200c35c..828d190 100644 --- a/pwd/collect_stats_task.go +++ b/pwd/collect_stats_task.go @@ -5,9 +5,10 @@ import ( "fmt" "log" - "github.com/docker/docker/api/types" + dockerTypes "github.com/docker/docker/api/types" units "github.com/docker/go-units" "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/pwd/types" ) type collectStatsTask struct { @@ -22,14 +23,14 @@ type collectStatsTask struct { docker docker.DockerApi } -func (c collectStatsTask) Run(i *Instance) error { +func (c collectStatsTask) Run(i *types.Instance) error { reader, err := c.docker.GetContainerStats(i.Name) if err != nil { log.Println("Error while trying to collect instance stats", err) return err } dec := json.NewDecoder(reader) - var v *types.StatsJSON + var v *dockerTypes.StatsJSON e := dec.Decode(&v) if e != nil { log.Println("Error while trying to collect instance stats", e) @@ -53,7 +54,7 @@ func (c collectStatsTask) Run(i *Instance) error { return nil } -func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 { +func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *dockerTypes.StatsJSON) float64 { var ( cpuPercent = 0.0 // calculate the change for the cpu usage of the container in between readings diff --git a/pwd/instance.go b/pwd/instance.go index e0807f5..c0abe36 100644 --- a/pwd/instance.go +++ b/pwd/instance.go @@ -1,19 +1,17 @@ package pwd import ( - "context" "fmt" "io" "log" - "net" "net/http" "path/filepath" "strings" - "sync" "time" "github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/pwd/types" "golang.org/x/text/encoding" ) @@ -29,35 +27,6 @@ func (s *sessionWriter) Write(p []byte) (n int, err error) { return len(p), nil } -type UInt16Slice []uint16 - -func (p UInt16Slice) Len() int { return len(p) } -func (p UInt16Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p UInt16Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -type Instance struct { - Image string `json:"image"` - Name string `json:"name"` - Hostname string `json:"hostname"` - IP string `json:"ip"` - IsManager *bool `json:"is_manager"` - Mem string `json:"mem"` - Cpu string `json:"cpu"` - Alias string `json:"alias"` - ServerCert []byte `json:"server_cert"` - ServerKey []byte `json:"server_key"` - CACert []byte `json:"ca_cert"` - Cert []byte `json:"cert"` - Key []byte `json:"key"` - IsDockerHost bool `json:"is_docker_host"` - session *Session `json:"-"` - conn net.Conn `json:"-"` - ctx context.Context `json:"-"` - docker docker.DockerApi `json:"-"` - tempPorts []uint16 `json:"-"` - Ports UInt16Slice - rw sync.Mutex -} type InstanceConfig struct { ImageName string Alias string @@ -70,32 +39,12 @@ type InstanceConfig struct { Host string } -func (i *Instance) setUsedPort(port uint16) { - i.rw.Lock() - defer i.rw.Unlock() - - for _, p := range i.tempPorts { - if p == port { - return - } - } - i.tempPorts = append(i.tempPorts, port) -} -func (i *Instance) IsConnected() bool { - return i.conn != nil - -} - -func (i *Instance) SetSession(s *Session) { - i.session = s -} - -func (p *pwd) InstanceResizeTerminal(instance *Instance, rows, cols uint) error { +func (p *pwd) InstanceResizeTerminal(instance *types.Instance, rows, cols uint) error { defer observeAction("InstanceResizeTerminal", time.Now()) return p.docker.ContainerResize(instance.Name, rows, cols) } -func (p *pwd) InstanceAttachTerminal(instance *Instance) error { +func (p *pwd) InstanceAttachTerminal(instance *types.Instance) error { conn, err := p.docker.CreateAttachConnection(instance.Name) if err != nil { @@ -103,14 +52,14 @@ func (p *pwd) InstanceAttachTerminal(instance *Instance) error { } encoder := encoding.Replacement.NewEncoder() - sw := &sessionWriter{sessionId: instance.session.Id, instanceName: instance.Name, broadcast: p.broadcast} - instance.conn = conn + sw := &sessionWriter{sessionId: instance.Session.Id, instanceName: instance.Name, broadcast: p.broadcast} + instance.Terminal = conn io.Copy(encoder.Writer(sw), conn) return nil } -func (p *pwd) InstanceUploadFromUrl(instance *Instance, url string) error { +func (p *pwd) InstanceUploadFromUrl(instance *types.Instance, url string) error { defer observeAction("InstanceUploadFromUrl", time.Now()) log.Printf("Downloading file [%s]\n", url) resp, err := http.Get(url) @@ -133,41 +82,34 @@ func (p *pwd) InstanceUploadFromUrl(instance *Instance, url string) error { return nil } -func (p *pwd) InstanceGet(session *Session, name string) *Instance { +func (p *pwd) InstanceGet(session *types.Session, name string) *types.Instance { defer observeAction("InstanceGet", time.Now()) return session.Instances[name] } -func (p *pwd) InstanceFindByIP(ip string) *Instance { +func (p *pwd) InstanceFindByIP(ip string) *types.Instance { defer observeAction("InstanceFindByIP", time.Now()) - for _, s := range sessions { - for _, i := range s.Instances { - if i.IP == ip { - return i - } - } + i, err := p.storage.InstanceFindByIP(ip) + if err != nil { + return nil } - return nil + + return i } -func (p *pwd) InstanceFindByAlias(sessionPrefix, alias string) *Instance { +func (p *pwd) InstanceFindByAlias(sessionPrefix, alias string) *types.Instance { defer observeAction("InstanceFindByAlias", time.Now()) - for id, s := range sessions { - if strings.HasPrefix(id, sessionPrefix) { - for _, i := range s.Instances { - if i.Alias == alias { - return i - } - } - } + i, err := p.storage.InstanceFindByAlias(sessionPrefix, alias) + if err != nil { + return nil } - return nil + return i } -func (p *pwd) InstanceDelete(session *Session, instance *Instance) error { +func (p *pwd) InstanceDelete(session *types.Session, instance *types.Instance) error { defer observeAction("InstanceDelete", time.Now()) - if instance.conn != nil { - instance.conn.Close() + if instance.Terminal != nil { + instance.Terminal.Close() } err := p.docker.DeleteContainer(instance.Name) if err != nil && !strings.Contains(err.Error(), "No such container") { @@ -178,16 +120,16 @@ func (p *pwd) InstanceDelete(session *Session, instance *Instance) error { p.broadcast.BroadcastTo(session.Id, "delete instance", instance.Name) delete(session.Instances, instance.Name) - if err := p.storage.Save(); err != nil { + if err := p.storage.SessionPut(session); err != nil { return err } - setGauges() + p.setGauges() return nil } -func (p *pwd) checkHostnameExists(session *Session, hostname string) bool { +func (p *pwd) checkHostnameExists(session *types.Session, hostname string) bool { containerName := fmt.Sprintf("%s_%s", session.Id[:8], hostname) exists := false for _, instance := range session.Instances { @@ -199,10 +141,10 @@ func (p *pwd) checkHostnameExists(session *Session, hostname string) bool { return exists } -func (p *pwd) InstanceNew(session *Session, conf InstanceConfig) (*Instance, error) { +func (p *pwd) InstanceNew(session *types.Session, conf InstanceConfig) (*types.Instance, error) { defer observeAction("InstanceNew", time.Now()) - session.rw.Lock() - defer session.rw.Unlock() + session.Lock() + defer session.Unlock() if conf.ImageName == "" { conf.ImageName = config.GetDindImageName() @@ -247,7 +189,7 @@ func (p *pwd) InstanceNew(session *Session, conf InstanceConfig) (*Instance, err return nil, err } - instance := &Instance{} + instance := &types.Instance{} instance.Image = opts.Image instance.IP = ip instance.Name = containerName @@ -258,33 +200,33 @@ func (p *pwd) InstanceNew(session *Session, conf InstanceConfig) (*Instance, err instance.ServerCert = conf.ServerCert instance.ServerKey = conf.ServerKey instance.CACert = conf.CACert - instance.session = session + instance.Session = session // For now this condition holds through. In the future we might need a more complex logic. instance.IsDockerHost = opts.Privileged if session.Instances == nil { - session.Instances = make(map[string]*Instance) + session.Instances = make(map[string]*types.Instance) } session.Instances[instance.Name] = instance go p.InstanceAttachTerminal(instance) - err = p.storage.Save() + err = p.storage.SessionPut(session) if err != nil { return nil, err } p.broadcast.BroadcastTo(session.Id, "new instance", instance.Name, instance.IP, instance.Hostname) - setGauges() + p.setGauges() return instance, nil } -func (p *pwd) InstanceWriteToTerminal(instance *Instance, data string) { +func (p *pwd) InstanceWriteToTerminal(instance *types.Instance, data string) { defer observeAction("InstanceWriteToTerminal", time.Now()) - if instance != nil && instance.conn != nil && len(data) > 0 { - instance.conn.Write([]byte(data)) + if instance != nil && instance.Terminal != nil && len(data) > 0 { + instance.Terminal.Write([]byte(data)) } } @@ -298,7 +240,7 @@ func (p *pwd) InstanceAllowedImages() []string { } -func (p *pwd) InstanceExec(instance *Instance, cmd []string) (int, error) { +func (p *pwd) InstanceExec(instance *types.Instance, cmd []string) (int, error) { defer observeAction("InstanceExec", time.Now()) return p.docker.Exec(instance.Name, cmd) } diff --git a/pwd/instance_test.go b/pwd/instance_test.go index a97e1b8..ea7270e 100644 --- a/pwd/instance_test.go +++ b/pwd/instance_test.go @@ -8,6 +8,7 @@ import ( "github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/stretchr/testify/assert" ) @@ -31,7 +32,7 @@ func TestInstanceResizeTerminal(t *testing.T) { p := NewPWD(docker, tasks, broadcast, storage) - err := p.InstanceResizeTerminal(&Instance{Name: "foobar"}, 24, 80) + err := p.InstanceResizeTerminal(&types.Instance{Name: "foobar"}, 24, 80) assert.Nil(t, err) assert.Equal(t, "foobar", resizedInstanceName) @@ -61,14 +62,14 @@ func TestInstanceNew(t *testing.T) { assert.Nil(t, err) - expectedInstance := Instance{ + expectedInstance := types.Instance{ Name: fmt.Sprintf("%s_node1", session.Id[:8]), Hostname: "node1", IP: "10.0.0.1", Alias: "", Image: config.GetDindImageName(), IsDockerHost: true, - session: session, + Session: session, } assert.Equal(t, expectedInstance, *instance) @@ -107,8 +108,8 @@ func TestInstanceNew_Concurrency(t *testing.T) { assert.Nil(t, err) - var instance1 *Instance - var instance2 *Instance + var instance1 *types.Instance + var instance2 *types.Instance wg := sync.WaitGroup{} wg.Add(2) @@ -152,14 +153,14 @@ func TestInstanceNew_WithNotAllowedImage(t *testing.T) { assert.Nil(t, err) - expectedInstance := Instance{ + expectedInstance := types.Instance{ Name: fmt.Sprintf("%s_node1", session.Id[:8]), Hostname: "node1", IP: "10.0.0.1", Alias: "", Image: "redis", IsDockerHost: false, - session: session, + Session: session, } assert.Equal(t, expectedInstance, *instance) @@ -200,14 +201,14 @@ func TestInstanceNew_WithCustomHostname(t *testing.T) { assert.Nil(t, err) - expectedInstance := Instance{ + expectedInstance := types.Instance{ Name: fmt.Sprintf("%s_redis-master", session.Id[:8]), Hostname: "redis-master", IP: "10.0.0.1", Alias: "", Image: "redis", IsDockerHost: false, - session: session, + Session: session, } assert.Equal(t, expectedInstance, *instance) diff --git a/pwd/pwd.go b/pwd/pwd.go index 124bd06..f0c3a1c 100644 --- a/pwd/pwd.go +++ b/pwd/pwd.go @@ -1,10 +1,11 @@ package pwd import ( - "sync" "time" "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/play-with-docker/play-with-docker/storage" "github.com/prometheus/client_golang/prometheus" ) @@ -33,65 +34,58 @@ func observeAction(action string, start time.Time) { latencyHistogramVec.WithLabelValues(action).Observe(float64(time.Since(start).Nanoseconds()) / 1000000) } -var sessions map[string]*Session -var sessionsMutex sync.Mutex - func init() { prometheus.MustRegister(sessionsGauge) prometheus.MustRegister(clientsGauge) prometheus.MustRegister(instancesGauge) prometheus.MustRegister(latencyHistogramVec) - - sessions = make(map[string]*Session) } type pwd struct { docker docker.DockerApi tasks SchedulerApi broadcast BroadcastApi - storage StorageApi + storage storage.StorageApi } type PWDApi interface { - SessionNew(duration time.Duration, stack string, stackName, imageName string) (*Session, error) - SessionClose(session *Session) error - SessionGetSmallestViewPort(session *Session) ViewPort - SessionDeployStack(session *Session) error - SessionGet(id string) *Session - SessionLoadAndPrepare() error - SessionSetup(session *Session, conf SessionSetupConf) error + SessionNew(duration time.Duration, stack string, stackName, imageName string) (*types.Session, error) + SessionClose(session *types.Session) error + SessionGetSmallestViewPort(session *types.Session) types.ViewPort + SessionDeployStack(session *types.Session) error + SessionGet(id string) *types.Session + SessionSetup(session *types.Session, conf SessionSetupConf) error - InstanceNew(session *Session, conf InstanceConfig) (*Instance, error) - InstanceResizeTerminal(instance *Instance, cols, rows uint) error - InstanceAttachTerminal(instance *Instance) error - InstanceUploadFromUrl(instance *Instance, url string) error - InstanceGet(session *Session, name string) *Instance - InstanceFindByIP(ip string) *Instance - InstanceFindByAlias(sessionPrefix, alias string) *Instance - InstanceDelete(session *Session, instance *Instance) error - InstanceWriteToTerminal(instance *Instance, data string) + InstanceNew(session *types.Session, conf InstanceConfig) (*types.Instance, error) + InstanceResizeTerminal(instance *types.Instance, cols, rows uint) error + InstanceAttachTerminal(instance *types.Instance) error + InstanceUploadFromUrl(instance *types.Instance, url string) error + InstanceGet(session *types.Session, name string) *types.Instance + InstanceFindByIP(ip string) *types.Instance + InstanceFindByAlias(sessionPrefix, alias string) *types.Instance + InstanceDelete(session *types.Session, instance *types.Instance) error + InstanceWriteToTerminal(instance *types.Instance, data string) InstanceAllowedImages() []string - InstanceExec(instance *Instance, cmd []string) (int, error) + InstanceExec(instance *types.Instance, cmd []string) (int, error) - ClientNew(id string, session *Session) *Client - ClientResizeViewPort(client *Client, cols, rows uint) - ClientClose(client *Client) + ClientNew(id string, session *types.Session) *types.Client + ClientResizeViewPort(client *types.Client, cols, rows uint) + ClientClose(client *types.Client) } -func NewPWD(d docker.DockerApi, t SchedulerApi, b BroadcastApi, s StorageApi) *pwd { +func NewPWD(d docker.DockerApi, t SchedulerApi, b BroadcastApi, s storage.StorageApi) *pwd { return &pwd{docker: d, tasks: t, broadcast: b, storage: s} } -func setGauges() { - var ins float64 - var cli float64 - - for _, s := range sessions { - ins += float64(len(s.Instances)) - cli += float64(len(s.clients)) - } +func (p *pwd) setGauges() { + s, _ := p.storage.SessionCount() + ses := float64(s) + i, _ := p.storage.InstanceCount() + ins := float64(i) + c, _ := p.storage.ClientCount() + cli := float64(c) clientsGauge.Set(cli) instancesGauge.Set(ins) - sessionsGauge.Set(float64(len(sessions))) + sessionsGauge.Set(ses) } diff --git a/pwd/session.go b/pwd/session.go index e584dc4..74026c7 100644 --- a/pwd/session.go +++ b/pwd/session.go @@ -11,6 +11,7 @@ import ( "github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/twinj/uuid" ) @@ -35,32 +36,12 @@ type SessionSetupInstanceConf struct { IsSwarmWorker bool `json:"is_swarm_worker"` } -type Session struct { - rw sync.Mutex - Id string `json:"id"` - Instances map[string]*Instance `json:"instances"` - CreatedAt time.Time `json:"created_at"` - ExpiresAt time.Time `json:"expires_at"` - PwdIpAddress string `json:"pwd_ip_address"` - Ready bool `json:"ready"` - Stack string `json:"stack"` - StackName string `json:"stack_name"` - ImageName string `json:"image_name"` - closingTimer *time.Timer `json:"-"` - scheduled bool `json:"-"` - clients []*Client `json:"-"` - ticker *time.Ticker `json:"-"` -} - -func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName string) (*Session, error) { +func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName string) (*types.Session, error) { defer observeAction("SessionNew", time.Now()) - sessionsMutex.Lock() - defer sessionsMutex.Unlock() - - s := &Session{} + s := &types.Session{} s.Id = uuid.NewV4().String() - s.Instances = map[string]*Instance{} + s.Instances = map[string]*types.Instance{} s.CreatedAt = time.Now() s.ExpiresAt = s.CreatedAt.Add(duration) s.Ready = true @@ -88,24 +69,24 @@ func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName str return nil, err } - sessions[s.Id] = s - if err := p.storage.Save(); err != nil { + if err := p.storage.SessionPut(s); err != nil { log.Println(err) return nil, err } - setGauges() + p.setGauges() return s, nil } -func (p *pwd) SessionClose(s *Session) error { - s.rw.Lock() - defer s.rw.Unlock() +func (p *pwd) SessionClose(s *types.Session) error { + defer observeAction("SessionClose", time.Now()) + + s.Lock() + defer s.Unlock() + + s.StopTicker() - if s.ticker != nil { - s.ticker.Stop() - } p.broadcast.BroadcastTo(s.Id, "session end") p.broadcast.BroadcastTo(s.Id, "disconnect") log.Printf("Starting clean up of session [%s]\n", s.Id) @@ -130,32 +111,35 @@ func (p *pwd) SessionClose(s *Session) error { return err } } - delete(sessions, s.Id) - // We store sessions as soon as we delete one - if err := p.storage.Save(); err != nil { + err := p.storage.SessionDelete(s.Id) + if err != nil { return err } - setGauges() + log.Printf("Cleaned up session [%s]\n", s.Id) + p.setGauges() return nil } -func (p *pwd) SessionGetSmallestViewPort(s *Session) ViewPort { - minRows := s.clients[0].viewPort.Rows - minCols := s.clients[0].viewPort.Cols +func (p *pwd) SessionGetSmallestViewPort(s *types.Session) types.ViewPort { + defer observeAction("SessionGetSmallestViewPort", time.Now()) - for _, c := range s.clients { - minRows = uint(math.Min(float64(minRows), float64(c.viewPort.Rows))) - minCols = uint(math.Min(float64(minCols), float64(c.viewPort.Cols))) + minRows := s.Clients[0].ViewPort.Rows + minCols := s.Clients[0].ViewPort.Cols + + for _, c := range s.Clients { + minRows = uint(math.Min(float64(minRows), float64(c.ViewPort.Rows))) + minCols = uint(math.Min(float64(minCols), float64(c.ViewPort.Cols))) } - return ViewPort{Rows: minRows, Cols: minCols} + return types.ViewPort{Rows: minRows, Cols: minCols} } -func (p *pwd) SessionDeployStack(s *Session) error { +func (p *pwd) SessionDeployStack(s *types.Session) error { defer observeAction("SessionDeployStack", time.Now()) + if s.Ready { // a stack was already deployed on this session, just ignore return nil @@ -188,60 +172,28 @@ func (p *pwd) SessionDeployStack(s *Session) error { log.Printf("Stack execution finished with code %d\n", code) s.Ready = true p.broadcast.BroadcastTo(s.Id, "session ready", true) - if err := p.storage.Save(); err != nil { + if err := p.storage.SessionPut(s); err != nil { return err } return nil } -func (p *pwd) SessionGet(sessionId string) *Session { +func (p *pwd) SessionGet(sessionId string) *types.Session { defer observeAction("SessionGet", time.Now()) - s := sessions[sessionId] + + s, _ := p.storage.SessionGet(sessionId) + + if err := p.prepareSession(s); err != nil { + log.Println(err) + return nil + } return s } -func (p *pwd) SessionLoadAndPrepare() error { - defer observeAction("SessionLoadAndPrepare", time.Now()) - err := p.storage.Load() - if err != nil { - return err - } - - wg := sync.WaitGroup{} - for _, s := range sessions { - // Connect PWD daemon to the new network - if s.PwdIpAddress == "" { - log.Printf("Cannot load store session [%s] as they don't have the pwd ip address stored with them\n", s.Id) - continue - } - wg.Add(1) - go func(s *Session) { - s.rw.Lock() - defer s.rw.Unlock() - defer wg.Done() - - err := p.prepareSession(s) - if err != nil { - log.Println(err) - } - for _, i := range s.Instances { - // wire the session back to the instance - i.session = s - go p.InstanceAttachTerminal(i) - } - }(s) - } - - wg.Wait() - setGauges() - - return nil -} - -func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error { +func (p *pwd) SessionSetup(session *types.Session, conf SessionSetupConf) error { defer observeAction("SessionSetup", time.Now()) var tokens *docker.SwarmTokens = nil - var firstSwarmManager *Instance = nil + var firstSwarmManager *types.Instance = nil // first look for a swarm manager and create it for _, conf := range conf.Instances { @@ -254,14 +206,14 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error { if err != nil { return err } - if i.docker == nil { + if i.Docker == nil { dock, err := p.docker.New(i.IP, i.Cert, i.Key) if err != nil { return err } - i.docker = dock + i.Docker = dock } - tkns, err := i.docker.SwarmInit() + tkns, err := i.Docker.SwarmInit() if err != nil { return err } @@ -290,13 +242,13 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error { } if c.IsSwarmManager || c.IsSwarmWorker { // check if we have connection to the daemon, if not, create it - if i.docker == nil { + if i.Docker == nil { dock, err := p.docker.New(i.IP, i.Cert, i.Key) if err != nil { log.Println(err) return } - i.docker = dock + i.Docker = dock } } @@ -304,7 +256,7 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error { if c.IsSwarmManager { // this is a swarm manager // cluster has already been initiated, join as manager - err := i.docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager) + err := i.Docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager) if err != nil { log.Println(err) return @@ -312,7 +264,7 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error { } if c.IsSwarmWorker { // this is a swarm worker - err := i.docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Worker) + err := i.Docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Worker) if err != nil { log.Println(err) return @@ -330,7 +282,14 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error { // This function should be called any time a session needs to be prepared: // 1. Like when it is created // 2. When it was loaded from storage -func (p *pwd) prepareSession(session *Session) error { +func (p *pwd) prepareSession(session *types.Session) error { + session.Lock() + defer session.Unlock() + + if session.IsPrepared() { + return nil + } + p.scheduleSessionClose(session) // Connect PWD daemon to the new network @@ -341,17 +300,24 @@ func (p *pwd) prepareSession(session *Session) error { // Schedule periodic tasks p.tasks.Schedule(session) + for _, i := range session.Instances { + // wire the session back to the instance + i.Session = session + go p.InstanceAttachTerminal(i) + } + session.SetPrepared() + return nil } -func (p *pwd) scheduleSessionClose(s *Session) { +func (p *pwd) scheduleSessionClose(s *types.Session) { timeLeft := s.ExpiresAt.Sub(time.Now()) - s.closingTimer = time.AfterFunc(timeLeft, func() { + s.SetClosingTimer(time.AfterFunc(timeLeft, func() { p.SessionClose(s) - }) + })) } -func (p *pwd) connectToNetwork(s *Session) error { +func (p *pwd) connectToNetwork(s *types.Session) error { ip, err := p.docker.ConnectNetwork(config.PWDContainerName, s.Id, s.PwdIpAddress) if err != nil { log.Println("ERROR NETWORKING") diff --git a/pwd/session_test.go b/pwd/session_test.go index 5432cc2..bef5a1d 100644 --- a/pwd/session_test.go +++ b/pwd/session_test.go @@ -2,24 +2,21 @@ package pwd import ( "fmt" - "net" - "sync" "testing" "time" "github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/stretchr/testify/assert" ) func TestSessionNew(t *testing.T) { - sessions = map[string]*Session{} - config.PWDContainerName = "pwd" var connectContainerName, connectNetworkName, connectIP string createdNetworkId := "" saveCalled := false - expectedSessions := map[string]*Session{} + expectedSessions := map[string]*types.Session{} docker := &mockDocker{} docker.createNetwork = func(id string) error { @@ -33,15 +30,15 @@ func TestSessionNew(t *testing.T) { return "10.0.0.1", nil } - var scheduledSession *Session + var scheduledSession *types.Session tasks := &mockTasks{} - tasks.schedule = func(s *Session) { + tasks.schedule = func(s *types.Session) { scheduledSession = s } broadcast := &mockBroadcast{} storage := &mockStorage{} - storage.save = func() error { + storage.sessionPut = func(s *types.Session) error { saveCalled = true return nil } @@ -72,7 +69,7 @@ func TestSessionNew(t *testing.T) { assert.Equal(t, "imageName", s.ImageName) assert.False(t, s.Ready) - assert.NotNil(t, s.closingTimer) + assert.NotNil(t, s.ClosingTimer()) assert.Equal(t, config.PWDContainerName, connectContainerName) assert.Equal(t, s.Id, connectNetworkName) @@ -82,7 +79,6 @@ func TestSessionNew(t *testing.T) { assert.Equal(t, s, scheduledSession) - assert.Equal(t, expectedSessions, sessions) assert.True(t, saveCalled) } @@ -208,72 +204,72 @@ func TestSessionSetup(t *testing.T) { manager1 := fmt.Sprintf("%s_manager1", s.Id[:8]) manager1Received := *s.Instances[manager1] - assert.Equal(t, Instance{ + assert.Equal(t, types.Instance{ Name: manager1, Image: "franela/dind", Hostname: "manager1", IP: "10.0.0.1", Alias: "", IsDockerHost: true, - session: s, - conn: manager1Received.conn, - docker: manager1Received.docker, + Session: s, + Terminal: manager1Received.Terminal, + Docker: manager1Received.Docker, }, manager1Received) manager2 := fmt.Sprintf("%s_manager2", s.Id[:8]) manager2Received := *s.Instances[manager2] - assert.Equal(t, Instance{ + assert.Equal(t, types.Instance{ Name: manager2, Image: "franela/dind", Hostname: "manager2", IP: "10.0.0.2", Alias: "", IsDockerHost: true, - session: s, - conn: manager2Received.conn, - docker: manager2Received.docker, + Session: s, + Terminal: manager2Received.Terminal, + Docker: manager2Received.Docker, }, manager2Received) manager3 := fmt.Sprintf("%s_manager3", s.Id[:8]) manager3Received := *s.Instances[manager3] - assert.Equal(t, Instance{ + assert.Equal(t, types.Instance{ Name: manager3, Image: "franela/dind:overlay2-dev", Hostname: "manager3", IP: "10.0.0.3", Alias: "", IsDockerHost: true, - session: s, - conn: manager3Received.conn, - docker: manager3Received.docker, + Session: s, + Terminal: manager3Received.Terminal, + Docker: manager3Received.Docker, }, manager3Received) worker1 := fmt.Sprintf("%s_worker1", s.Id[:8]) worker1Received := *s.Instances[worker1] - assert.Equal(t, Instance{ + assert.Equal(t, types.Instance{ Name: worker1, Image: "franela/dind", Hostname: "worker1", IP: "10.0.0.4", Alias: "", IsDockerHost: true, - session: s, - conn: worker1Received.conn, - docker: worker1Received.docker, + Session: s, + Terminal: worker1Received.Terminal, + Docker: worker1Received.Docker, }, worker1Received) other := fmt.Sprintf("%s_other", s.Id[:8]) otherReceived := *s.Instances[other] - assert.Equal(t, Instance{ + assert.Equal(t, types.Instance{ Name: other, Image: "franela/dind", Hostname: "other", IP: "10.0.0.5", Alias: "", IsDockerHost: true, - session: s, - conn: otherReceived.conn, - docker: otherReceived.docker, + Session: s, + Terminal: otherReceived.Terminal, + Docker: otherReceived.Docker, }, otherReceived) assert.True(t, swarmInitOnMaster1) @@ -281,100 +277,3 @@ func TestSessionSetup(t *testing.T) { assert.True(t, manager3JoinedHasManager) assert.True(t, worker1JoinedHasWorker) } - -func TestSessionLoadAndPrepare(t *testing.T) { - config.PWDContainerName = "pwd" - lock := sync.Mutex{} - var s1NetworkConnect []string - var s2NetworkConnect []string - - wg := sync.WaitGroup{} - wg.Add(3) - connectedInstances := []string{} - sessions = map[string]*Session{} - i1 := &Instance{ - Image: "dind", - Name: "session1_i1", - Hostname: "i1", - IP: "10.0.0.10", - IsDockerHost: true, - } - i2 := &Instance{ - Image: "dind", - Name: "session1_i2", - Hostname: "i1", - IP: "10.0.0.11", - IsDockerHost: true, - } - i3 := &Instance{ - Image: "dind", - Name: "session1_i3", - Hostname: "i1", - IP: "10.0.0.12", - IsDockerHost: true, - } - s1 := &Session{ - Id: "session1", - Instances: map[string]*Instance{"session1_i1": i1}, - CreatedAt: time.Now(), - ExpiresAt: time.Now().Add(time.Hour), - PwdIpAddress: "10.0.0.1", - Ready: true, - Stack: "", - StackName: "", - } - s2 := &Session{ - Id: "session2", - Instances: map[string]*Instance{"session1_i2": i2, "session1_i3": i3}, - CreatedAt: time.Now(), - ExpiresAt: time.Now().Add(time.Hour), - PwdIpAddress: "10.0.0.2", - Ready: true, - Stack: "", - StackName: "", - } - - dock := &mockDocker{} - dock.createAttachConnection = func(instanceName string) (net.Conn, error) { - lock.Lock() - defer lock.Unlock() - connectedInstances = append(connectedInstances, instanceName) - wg.Done() - return &mockConn{}, nil - } - dock.connectNetwork = func(container, network, ip string) (string, error) { - if s1.Id == network { - s1NetworkConnect = []string{container, network, ip} - } else if s2.Id == network { - s2NetworkConnect = []string{container, network, ip} - } - return ip, nil - } - tasks := &mockTasks{} - tasks.schedule = func(s *Session) { - s.ticker = time.NewTicker(1 * time.Second) - } - broadcast := &mockBroadcast{} - storage := &mockStorage{} - - storage.load = func() error { - sessions = map[string]*Session{"session1": s1, "session2": s2} - return nil - } - - p := NewPWD(dock, tasks, broadcast, storage) - - err := p.SessionLoadAndPrepare() - assert.Nil(t, err) - assert.Len(t, sessions, 2) - assert.NotNil(t, s1.closingTimer) - assert.NotNil(t, s2.closingTimer) - assert.NotNil(t, s1.ticker) - assert.NotNil(t, s2.ticker) - - assert.Equal(t, []string{"pwd", s1.Id, s1.PwdIpAddress}, s1NetworkConnect) - assert.Equal(t, []string{"pwd", s2.Id, s2.PwdIpAddress}, s2NetworkConnect) - - wg.Wait() - assert.Subset(t, connectedInstances, []string{i1.Name, i2.Name, i3.Name}) -} diff --git a/pwd/storage.go b/pwd/storage.go deleted file mode 100644 index e3b2509..0000000 --- a/pwd/storage.go +++ /dev/null @@ -1,50 +0,0 @@ -package pwd - -import ( - "encoding/gob" - "os" - "sync" - - "github.com/play-with-docker/play-with-docker/config" -) - -type StorageApi interface { - Save() error - Load() error -} - -type storage struct { - rw sync.Mutex -} - -func (store *storage) Load() error { - file, err := os.Open(config.SessionsFile) - - if err == nil { - decoder := gob.NewDecoder(file) - err = decoder.Decode(&sessions) - - if err != nil { - return err - } - } - - file.Close() - return nil -} - -func (store *storage) Save() error { - store.rw.Lock() - defer store.rw.Unlock() - file, err := os.Create(config.SessionsFile) - if err == nil { - encoder := gob.NewEncoder(file) - err = encoder.Encode(&sessions) - } - file.Close() - return nil -} - -func NewStorage() *storage { - return &storage{} -} diff --git a/pwd/storage_mock_test.go b/pwd/storage_mock_test.go index 197c8f2..d9d5d73 100644 --- a/pwd/storage_mock_test.go +++ b/pwd/storage_mock_test.go @@ -1,19 +1,63 @@ package pwd +import "github.com/play-with-docker/play-with-docker/pwd/types" + type mockStorage struct { - save func() error - load func() error + sessionGet func(sessionId string) (*types.Session, error) + sessionPut func(s *types.Session) error + sessionCount func() (int, error) + sessionDelete func(sessionId string) error + instanceFindByAlias func(sessionPrefix, alias string) (*types.Instance, error) + instanceFindByIP func(ip string) (*types.Instance, error) + instanceCount func() (int, error) + clientCount func() (int, error) } -func (m *mockStorage) Save() error { - if m.save != nil { - return m.save() +func (m *mockStorage) SessionGet(sessionId string) (*types.Session, error) { + if m.sessionGet != nil { + return m.sessionGet(sessionId) + } + return nil, nil +} +func (m *mockStorage) SessionPut(s *types.Session) error { + if m.sessionPut != nil { + return m.sessionPut(s) } return nil } -func (m *mockStorage) Load() error { - if m.load != nil { - return m.load() +func (m *mockStorage) SessionCount() (int, error) { + if m.sessionCount != nil { + return m.sessionCount() + } + return 0, nil +} +func (m *mockStorage) SessionDelete(sessionId string) error { + if m.sessionDelete != nil { + return m.sessionDelete(sessionId) } return nil } +func (m *mockStorage) InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error) { + if m.instanceFindByAlias != nil { + return m.instanceFindByAlias(sessionPrefix, alias) + } + return nil, nil +} +func (m *mockStorage) InstanceFindByIP(ip string) (*types.Instance, error) { + if m.instanceFindByIP != nil { + return m.instanceFindByIP(ip) + } + return nil, nil +} +func (m *mockStorage) InstanceCount() (int, error) { + if m.instanceCount != nil { + return m.instanceCount() + } + return 0, nil +} +func (m *mockStorage) ClientCount() (int, error) { + if m.clientCount != nil { + return m.clientCount() + } + return 0, nil +} diff --git a/pwd/tasks.go b/pwd/tasks.go index 2b8ae57..adc52ce 100644 --- a/pwd/tasks.go +++ b/pwd/tasks.go @@ -15,15 +15,16 @@ import ( "github.com/docker/docker/client" "github.com/docker/go-connections/tlsconfig" "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/pwd/types" ) type periodicTask interface { - Run(i *Instance) error + Run(i *types.Instance) error } type SchedulerApi interface { - Schedule(session *Session) - Unschedule(session *Session) + Schedule(session *types.Session) + Unschedule(session *types.Session) } type scheduler struct { @@ -31,21 +32,20 @@ type scheduler struct { periodicTasks []periodicTask } -func (sch *scheduler) Schedule(s *Session) { - if s.scheduled { +func (sch *scheduler) Schedule(s *types.Session) { + if s.IsPrepared() { return } go func() { - s.scheduled = true - - s.ticker = time.NewTicker(1 * time.Second) - for range s.ticker.C { + t := time.NewTicker(1 * time.Second) + s.SetTicker(t) + for range t.C { var wg = sync.WaitGroup{} wg.Add(len(s.Instances)) for _, ins := range s.Instances { - var i *Instance = ins - if i.docker == nil && i.IsDockerHost { + var i *types.Instance = ins + if i.Docker == nil && i.IsDockerHost { // Need to create client to the DinD docker daemon // We check if the client needs to use TLS @@ -76,7 +76,7 @@ func (sch *scheduler) Schedule(s *Session) { if err != nil { log.Println("Could not connect to DinD docker daemon", err) } else { - i.docker = docker.NewDocker(c) + i.Docker = docker.NewDocker(c) } } go func() { @@ -98,17 +98,17 @@ func (sch *scheduler) Schedule(s *Session) { wg.Wait() // broadcast all information for _, ins := range s.Instances { - ins.Ports = UInt16Slice(ins.tempPorts) + ins.Ports = types.UInt16Slice(ins.GetUsedPorts()) sort.Sort(ins.Ports) - ins.tempPorts = []uint16{} + ins.CleanUsedPorts() - sch.broadcast.BroadcastTo(ins.session.Id, "instance stats", ins.Name, ins.Mem, ins.Cpu, ins.IsManager, ins.Ports) + sch.broadcast.BroadcastTo(ins.Session.Id, "instance stats", ins.Name, ins.Mem, ins.Cpu, ins.IsManager, ins.Ports) } } }() } -func (sch *scheduler) Unschedule(s *Session) { +func (sch *scheduler) Unschedule(s *types.Session) { } func NewScheduler(b BroadcastApi, d docker.DockerApi) *scheduler { diff --git a/pwd/tasks_mock_test.go b/pwd/tasks_mock_test.go index e990c61..b184349 100644 --- a/pwd/tasks_mock_test.go +++ b/pwd/tasks_mock_test.go @@ -1,16 +1,18 @@ package pwd +import "github.com/play-with-docker/play-with-docker/pwd/types" + type mockTasks struct { - schedule func(s *Session) - unschedule func(s *Session) + schedule func(s *types.Session) + unschedule func(s *types.Session) } -func (m *mockTasks) Schedule(s *Session) { +func (m *mockTasks) Schedule(s *types.Session) { if m.schedule != nil { m.schedule(s) } } -func (m *mockTasks) Unschedule(s *Session) { +func (m *mockTasks) Unschedule(s *types.Session) { if m.unschedule != nil { m.unschedule(s) } diff --git a/pwd/types/client.go b/pwd/types/client.go new file mode 100644 index 0000000..2e31e5e --- /dev/null +++ b/pwd/types/client.go @@ -0,0 +1,12 @@ +package types + +type Client struct { + Id string + ViewPort ViewPort + Session *Session +} + +type ViewPort struct { + Rows uint + Cols uint +} diff --git a/pwd/types/instance.go b/pwd/types/instance.go new file mode 100644 index 0000000..b39f60e --- /dev/null +++ b/pwd/types/instance.go @@ -0,0 +1,63 @@ +package types + +import ( + "context" + "net" + "sync" + + "github.com/play-with-docker/play-with-docker/docker" +) + +type UInt16Slice []uint16 + +func (p UInt16Slice) Len() int { return len(p) } +func (p UInt16Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p UInt16Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +type Instance struct { + Image string `json:"image"` + Name string `json:"name"` + Hostname string `json:"hostname"` + IP string `json:"ip"` + IsManager *bool `json:"is_manager"` + Mem string `json:"mem"` + Cpu string `json:"cpu"` + Alias string `json:"alias"` + ServerCert []byte `json:"server_cert"` + ServerKey []byte `json:"server_key"` + CACert []byte `json:"ca_cert"` + Cert []byte `json:"cert"` + Key []byte `json:"key"` + IsDockerHost bool `json:"is_docker_host"` + Docker docker.DockerApi `json:"-"` + Session *Session `json:"-"` + Terminal net.Conn `json:"-"` + ctx context.Context `json:"-"` + tempPorts []uint16 `json:"-"` + Ports UInt16Slice + rw sync.Mutex +} + +func (i *Instance) SetUsedPort(port uint16) { + i.rw.Lock() + defer i.rw.Unlock() + + for _, p := range i.tempPorts { + if p == port { + return + } + } + i.tempPorts = append(i.tempPorts, port) +} +func (i *Instance) GetUsedPorts() []uint16 { + i.rw.Lock() + defer i.rw.Unlock() + + return i.tempPorts +} +func (i *Instance) CleanUsedPorts() { + i.rw.Lock() + defer i.rw.Unlock() + + i.tempPorts = []uint16{} +} diff --git a/pwd/types/session.go b/pwd/types/session.go new file mode 100644 index 0000000..1c0b384 --- /dev/null +++ b/pwd/types/session.go @@ -0,0 +1,55 @@ +package types + +import ( + "sync" + "time" +) + +type Session struct { + Id string `json:"id"` + Instances map[string]*Instance `json:"instances"` + CreatedAt time.Time `json:"created_at"` + ExpiresAt time.Time `json:"expires_at"` + PwdIpAddress string `json:"pwd_ip_address"` + Ready bool `json:"ready"` + Stack string `json:"stack"` + StackName string `json:"stack_name"` + ImageName string `json:"image_name"` + Clients []*Client `json:"-"` + closingTimer *time.Timer `json:"-"` + scheduled bool `json:"-"` + ticker *time.Ticker `json:"-"` + rw sync.Mutex `json:"-"` + prepared bool `json:"-"` +} + +func (s *Session) Lock() { + s.rw.Lock() +} + +func (s *Session) Unlock() { + s.rw.Unlock() +} + +func (s *Session) StopTicker() { + if s.ticker != nil { + s.ticker.Stop() + } +} +func (s *Session) SetTicker(t *time.Ticker) { + s.ticker = t +} +func (s *Session) SetClosingTimer(t *time.Timer) { + s.closingTimer = t +} +func (s *Session) ClosingTimer() *time.Timer { + return s.closingTimer +} + +func (s *Session) IsPrepared() bool { + return s.prepared +} + +func (s *Session) SetPrepared() { + s.prepared = true +} diff --git a/storage/file.go b/storage/file.go new file mode 100644 index 0000000..7b3dfa8 --- /dev/null +++ b/storage/file.go @@ -0,0 +1,150 @@ +package storage + +import ( + "encoding/json" + "fmt" + "os" + "strings" + "sync" + + "github.com/play-with-docker/play-with-docker/pwd/types" +) + +type storage struct { + rw sync.Mutex + path string + db map[string]*types.Session +} + +func (store *storage) SessionGet(sessionId string) (*types.Session, error) { + store.rw.Lock() + defer store.rw.Unlock() + + s, found := store.db[sessionId] + if !found { + return nil, fmt.Errorf("%s", notFound) + } + + return s, nil +} +func (store *storage) SessionPut(s *types.Session) error { + store.rw.Lock() + defer store.rw.Unlock() + + store.db[s.Id] = s + + return store.save() +} + +func (store *storage) InstanceFindByIP(ip string) (*types.Instance, error) { + store.rw.Lock() + defer store.rw.Unlock() + + for _, s := range store.db { + for _, i := range s.Instances { + if i.IP == ip { + return i, nil + } + } + } + + return nil, fmt.Errorf("%s", notFound) +} + +func (store *storage) InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error) { + store.rw.Lock() + defer store.rw.Unlock() + + for id, s := range store.db { + if strings.HasPrefix(id, sessionPrefix) { + for _, i := range s.Instances { + if i.Alias == alias { + return i, nil + } + } + } + } + + return nil, fmt.Errorf("%s", notFound) +} + +func (store *storage) SessionCount() (int, error) { + store.rw.Lock() + defer store.rw.Unlock() + + return len(store.db), nil +} + +func (store *storage) InstanceCount() (int, error) { + store.rw.Lock() + defer store.rw.Unlock() + + var ins int + + for _, s := range store.db { + ins += len(s.Instances) + } + + return ins, nil +} + +func (store *storage) ClientCount() (int, error) { + store.rw.Lock() + defer store.rw.Unlock() + + var cli int + + for _, s := range store.db { + cli += len(s.Clients) + } + + return cli, nil +} + +func (store *storage) SessionDelete(sessionId string) error { + store.rw.Lock() + defer store.rw.Unlock() + + delete(store.db, sessionId) + return store.save() +} + +func (store *storage) load() error { + file, err := os.Open(store.path) + + if err == nil { + decoder := json.NewDecoder(file) + err = decoder.Decode(&store.db) + + if err != nil { + return err + } + } else { + store.db = map[string]*types.Session{} + } + + file.Close() + return nil +} + +func (store *storage) save() error { + file, err := os.Create(store.path) + if err != nil { + return err + } + defer file.Close() + encoder := json.NewEncoder(file) + err = encoder.Encode(&store.db) + return err +} + +func NewFileStorage(path string) (StorageApi, error) { + s := &storage{path: path} + + err := s.load() + if err != nil { + return nil, err + } + + return s, nil +} diff --git a/storage/file_test.go b/storage/file_test.go new file mode 100644 index 0000000..47fa848 --- /dev/null +++ b/storage/file_test.go @@ -0,0 +1,218 @@ +package storage + +import ( + "encoding/json" + "io/ioutil" + "log" + "os" + "testing" + + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/stretchr/testify/assert" +) + +func TestSessionPut(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "pwd") + if err != nil { + log.Fatal(err) + } + tmpfile.Close() + os.Remove(tmpfile.Name()) + defer os.Remove(tmpfile.Name()) + + storage, err := NewFileStorage(tmpfile.Name()) + + assert.Nil(t, err) + + s := &types.Session{Id: "a session"} + err = storage.SessionPut(s) + + assert.Nil(t, err) + + var loadedSessions map[string]*types.Session + expectedSessions := map[string]*types.Session{} + expectedSessions[s.Id] = s + + file, err := os.Open(tmpfile.Name()) + + assert.Nil(t, err) + defer file.Close() + + decoder := json.NewDecoder(file) + err = decoder.Decode(&loadedSessions) + + assert.Nil(t, err) + + assert.EqualValues(t, expectedSessions, loadedSessions) +} + +func TestSessionGet(t *testing.T) { + expectedSession := &types.Session{Id: "session1"} + sessions := map[string]*types.Session{} + sessions[expectedSession.Id] = expectedSession + + tmpfile, err := ioutil.TempFile("", "pwd") + if err != nil { + log.Fatal(err) + } + encoder := json.NewEncoder(tmpfile) + err = encoder.Encode(&sessions) + assert.Nil(t, err) + tmpfile.Close() + defer os.Remove(tmpfile.Name()) + + storage, err := NewFileStorage(tmpfile.Name()) + + assert.Nil(t, err) + + _, err = storage.SessionGet("bad id") + assert.True(t, NotFound(err)) + + loadedSession, err := storage.SessionGet("session1") + assert.Nil(t, err) + + assert.Equal(t, expectedSession, loadedSession) +} + +func TestInstanceFindByIP(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "pwd") + if err != nil { + log.Fatal(err) + } + tmpfile.Close() + os.Remove(tmpfile.Name()) + defer os.Remove(tmpfile.Name()) + + storage, err := NewFileStorage(tmpfile.Name()) + + assert.Nil(t, err) + + i1 := &types.Instance{Name: "i1", IP: "10.0.0.1"} + i2 := &types.Instance{Name: "i2", IP: "10.1.0.1"} + s1 := &types.Session{Id: "session1", Instances: map[string]*types.Instance{"i1": i1}} + s2 := &types.Session{Id: "session2", Instances: map[string]*types.Instance{"i2": i2}} + err = storage.SessionPut(s1) + assert.Nil(t, err) + err = storage.SessionPut(s2) + assert.Nil(t, err) + + foundInstance, err := storage.InstanceFindByIP("10.0.0.1") + assert.Nil(t, err) + assert.Equal(t, i1, foundInstance) + + foundInstance, err = storage.InstanceFindByIP("10.1.0.1") + assert.Nil(t, err) + assert.Equal(t, i2, foundInstance) + + foundInstance, err = storage.InstanceFindByIP("192.168.0.1") + assert.True(t, NotFound(err)) + assert.Nil(t, foundInstance) +} + +func TestInstanceFindByAlias(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "pwd") + if err != nil { + log.Fatal(err) + } + tmpfile.Close() + os.Remove(tmpfile.Name()) + defer os.Remove(tmpfile.Name()) + + storage, err := NewFileStorage(tmpfile.Name()) + + assert.Nil(t, err) + + i1 := &types.Instance{Name: "i1", Alias: "foo", IP: "10.0.0.1"} + i2 := &types.Instance{Name: "i2", Alias: "foo", IP: "10.1.0.1"} + s1 := &types.Session{Id: "session1", Instances: map[string]*types.Instance{"i1": i1}} + s2 := &types.Session{Id: "session2", Instances: map[string]*types.Instance{"i2": i2}} + err = storage.SessionPut(s1) + assert.Nil(t, err) + err = storage.SessionPut(s2) + assert.Nil(t, err) + + foundInstance, err := storage.InstanceFindByAlias("session1", "foo") + assert.Nil(t, err) + assert.Equal(t, i1, foundInstance) + + foundInstance, err = storage.InstanceFindByAlias("session2", "foo") + assert.Nil(t, err) + assert.Equal(t, i2, foundInstance) + + foundInstance, err = storage.InstanceFindByAlias("session1", "bar") + assert.True(t, NotFound(err)) + assert.Nil(t, foundInstance) + + foundInstance, err = storage.InstanceFindByAlias("session3", "foo") + assert.True(t, NotFound(err)) + assert.Nil(t, foundInstance) +} + +func TestCounts(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "pwd") + if err != nil { + log.Fatal(err) + } + tmpfile.Close() + os.Remove(tmpfile.Name()) + defer os.Remove(tmpfile.Name()) + + storage, err := NewFileStorage(tmpfile.Name()) + + assert.Nil(t, err) + + c1 := &types.Client{} + i1 := &types.Instance{Name: "i1", Alias: "foo", IP: "10.0.0.1"} + i2 := &types.Instance{Name: "i2", Alias: "foo", IP: "10.1.0.1"} + s1 := &types.Session{Id: "session1", Instances: map[string]*types.Instance{"i1": i1}} + s2 := &types.Session{Id: "session2", Instances: map[string]*types.Instance{"i2": i2}} + s3 := &types.Session{Id: "session3", Clients: []*types.Client{c1}} + + err = storage.SessionPut(s1) + assert.Nil(t, err) + err = storage.SessionPut(s2) + assert.Nil(t, err) + err = storage.SessionPut(s3) + assert.Nil(t, err) + + num, err := storage.SessionCount() + assert.Nil(t, err) + assert.Equal(t, 3, num) + + num, err = storage.InstanceCount() + assert.Nil(t, err) + assert.Equal(t, 2, num) + + num, err = storage.ClientCount() + assert.Nil(t, err) + assert.Equal(t, 1, num) +} + +func TestSessionDelete(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "pwd") + if err != nil { + log.Fatal(err) + } + tmpfile.Close() + os.Remove(tmpfile.Name()) + defer os.Remove(tmpfile.Name()) + + storage, err := NewFileStorage(tmpfile.Name()) + + assert.Nil(t, err) + + s1 := &types.Session{Id: "session1"} + err = storage.SessionPut(s1) + assert.Nil(t, err) + + found, err := storage.SessionGet(s1.Id) + assert.Nil(t, err) + assert.Equal(t, s1, found) + + err = storage.SessionDelete(s1.Id) + assert.Nil(t, err) + + found, err = storage.SessionGet(s1.Id) + assert.True(t, NotFound(err)) + assert.Nil(t, found) +} diff --git a/storage/storage.go b/storage/storage.go new file mode 100644 index 0000000..2cd7e96 --- /dev/null +++ b/storage/storage.go @@ -0,0 +1,23 @@ +package storage + +import "github.com/play-with-docker/play-with-docker/pwd/types" + +const notFound = "NotFound" + +func NotFound(e error) bool { + return e.Error() == notFound +} + +type StorageApi interface { + SessionGet(sessionId string) (*types.Session, error) + SessionPut(*types.Session) error + SessionCount() (int, error) + SessionDelete(sessionId string) error + + InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error) + // Should have the session id too, soon + InstanceFindByIP(ip string) (*types.Instance, error) + InstanceCount() (int, error) + + ClientCount() (int, error) +}