Merge pull request #22 from xetorthio/storage_refactor

Storage has now it's own package.
This commit is contained in:
Jonathan Leibiusky
2017-07-06 15:14:42 -03:00
committed by GitHub
22 changed files with 892 additions and 513 deletions

View File

@@ -3,11 +3,12 @@ package handlers
import ( import (
"log" "log"
"os" "os"
"time"
"github.com/docker/docker/client" "github.com/docker/docker/client"
"github.com/play-with-docker/play-with-docker/config"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/pwd" "github.com/play-with-docker/play-with-docker/pwd"
"github.com/play-with-docker/play-with-docker/storage"
) )
var core pwd.PWDApi var core pwd.PWDApi
@@ -28,16 +29,11 @@ func Bootstrap() {
t := pwd.NewScheduler(Broadcast, d) t := pwd.NewScheduler(Broadcast, d)
s := pwd.NewStorage() s, err := storage.NewFileStorage(config.SessionsFile)
core = pwd.NewPWD(d, t, Broadcast, s)
loadStart := time.Now()
err = core.SessionLoadAndPrepare()
loadElapsed := time.Since(loadStart)
log.Printf("***************** Loading stored sessions took %s\n", loadElapsed)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
log.Fatal("Error decoding sessions from disk ", err) log.Fatal("Error decoding sessions from disk ", err)
} }
core = pwd.NewPWD(d, t, Broadcast, s)
} }

View File

@@ -4,16 +4,17 @@ import (
"log" "log"
"github.com/docker/docker/api/types/swarm" "github.com/docker/docker/api/types/swarm"
"github.com/play-with-docker/play-with-docker/pwd/types"
) )
type checkSwarmStatusTask struct { type checkSwarmStatusTask struct {
} }
func (c checkSwarmStatusTask) Run(i *Instance) error { func (c checkSwarmStatusTask) Run(i *types.Instance) error {
if i.docker == nil { if i.Docker == nil {
return nil return nil
} }
if info, err := i.docker.GetDaemonInfo(); err == nil { if info, err := i.Docker.GetDaemonInfo(); err == nil {
if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked { if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked {
i.IsManager = &info.Swarm.ControlAvailable i.IsManager = &info.Swarm.ControlAvailable
} else { } else {

View File

@@ -3,27 +3,29 @@ package pwd
import ( import (
"fmt" "fmt"
"log" "log"
"github.com/play-with-docker/play-with-docker/pwd/types"
) )
type checkSwarmUsedPortsTask struct { type checkSwarmUsedPortsTask struct {
} }
func (c checkSwarmUsedPortsTask) Run(i *Instance) error { func (c checkSwarmUsedPortsTask) Run(i *types.Instance) error {
if i.docker == nil { if i.Docker == nil {
return nil return nil
} }
if i.IsManager != nil && *i.IsManager { if i.IsManager != nil && *i.IsManager {
sessionPrefix := i.session.Id[:8] sessionPrefix := i.Session.Id[:8]
// This is a swarm manager instance, then check for ports // This is a swarm manager instance, then check for ports
if hosts, ports, err := i.docker.GetSwarmPorts(); err != nil { if hosts, ports, err := i.Docker.GetSwarmPorts(); err != nil {
log.Println(err) log.Println(err)
return err return err
} else { } else {
for _, host := range hosts { for _, host := range hosts {
host = fmt.Sprintf("%s_%s", sessionPrefix, host) host = fmt.Sprintf("%s_%s", sessionPrefix, host)
for _, port := range ports { for _, port := range ports {
if i.session.Instances[host] != nil { if i.Session.Instances[host] != nil {
i.session.Instances[host].setUsedPort(port) i.Session.Instances[host].SetUsedPort(port)
} }
} }
} }

View File

@@ -1,17 +1,21 @@
package pwd package pwd
import "log" import (
"log"
"github.com/play-with-docker/play-with-docker/pwd/types"
)
type checkUsedPortsTask struct { type checkUsedPortsTask struct {
} }
func (c checkUsedPortsTask) Run(i *Instance) error { func (c checkUsedPortsTask) Run(i *types.Instance) error {
if i.docker == nil { if i.Docker == nil {
return nil return nil
} }
if ports, err := i.docker.GetPorts(); err == nil { if ports, err := i.Docker.GetPorts(); err == nil {
for _, p := range ports { for _, p := range ports {
i.setUsedPort(uint16(p)) i.SetUsedPort(uint16(p))
} }
} else { } else {
log.Println(err) log.Println(err)

View File

@@ -3,51 +3,42 @@ package pwd
import ( import (
"log" "log"
"time" "time"
"github.com/play-with-docker/play-with-docker/pwd/types"
) )
type Client struct { func (p *pwd) ClientNew(id string, session *types.Session) *types.Client {
Id string
viewPort ViewPort
session *Session
}
type ViewPort struct {
Rows uint
Cols uint
}
func (p *pwd) ClientNew(id string, session *Session) *Client {
defer observeAction("ClientNew", time.Now()) defer observeAction("ClientNew", time.Now())
c := &Client{Id: id, session: session} c := &types.Client{Id: id, Session: session}
session.clients = append(session.clients, c) session.Clients = append(session.Clients, c)
return c return c
} }
func (p *pwd) ClientResizeViewPort(c *Client, cols, rows uint) { func (p *pwd) ClientResizeViewPort(c *types.Client, cols, rows uint) {
defer observeAction("ClientResizeViewPort", time.Now()) defer observeAction("ClientResizeViewPort", time.Now())
c.viewPort.Rows = rows c.ViewPort.Rows = rows
c.viewPort.Cols = cols c.ViewPort.Cols = cols
p.notifyClientSmallestViewPort(c.session) p.notifyClientSmallestViewPort(c.Session)
} }
func (p *pwd) ClientClose(client *Client) { func (p *pwd) ClientClose(client *types.Client) {
defer observeAction("ClientClose", time.Now()) defer observeAction("ClientClose", time.Now())
// Client has disconnected. Remove from session and recheck terminal sizes. // Client has disconnected. Remove from session and recheck terminal sizes.
session := client.session session := client.Session
for i, cl := range session.clients { for i, cl := range session.Clients {
if cl.Id == client.Id { if cl.Id == client.Id {
session.clients = append(session.clients[:i], session.clients[i+1:]...) session.Clients = append(session.Clients[:i], session.Clients[i+1:]...)
break break
} }
} }
if len(session.clients) > 0 { if len(session.Clients) > 0 {
p.notifyClientSmallestViewPort(session) p.notifyClientSmallestViewPort(session)
} }
setGauges() p.setGauges()
} }
func (p *pwd) notifyClientSmallestViewPort(session *Session) { func (p *pwd) notifyClientSmallestViewPort(session *types.Session) {
vp := p.SessionGetSmallestViewPort(session) vp := p.SessionGetSmallestViewPort(session)
// Resize all terminals in the session // Resize all terminals in the session
p.broadcast.BroadcastTo(session.Id, "viewport resize", vp.Cols, vp.Rows) p.broadcast.BroadcastTo(session.Id, "viewport resize", vp.Cols, vp.Rows)

View File

@@ -4,6 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/play-with-docker/play-with-docker/pwd/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@@ -20,8 +21,8 @@ func TestClientNew(t *testing.T) {
client := p.ClientNew("foobar", session) client := p.ClientNew("foobar", session)
assert.Equal(t, Client{Id: "foobar", session: session, viewPort: ViewPort{Cols: 0, Rows: 0}}, *client) assert.Equal(t, types.Client{Id: "foobar", Session: session, ViewPort: types.ViewPort{Cols: 0, Rows: 0}}, *client)
assert.Contains(t, session.clients, client) assert.Contains(t, session.Clients, client)
} }
func TestClientResizeViewPort(t *testing.T) { func TestClientResizeViewPort(t *testing.T) {
@@ -49,7 +50,7 @@ func TestClientResizeViewPort(t *testing.T) {
p.ClientResizeViewPort(client, 80, 24) p.ClientResizeViewPort(client, 80, 24)
assert.Equal(t, ViewPort{Cols: 80, Rows: 24}, client.viewPort) assert.Equal(t, types.ViewPort{Cols: 80, Rows: 24}, client.ViewPort)
assert.Equal(t, session.Id, broadcastedSessionId) assert.Equal(t, session.Id, broadcastedSessionId)
assert.Equal(t, "viewport resize", broadcastedEventName) assert.Equal(t, "viewport resize", broadcastedEventName)
assert.Equal(t, uint(80), broadcastedArgs[0]) assert.Equal(t, uint(80), broadcastedArgs[0])

View File

@@ -5,9 +5,10 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/docker/docker/api/types" dockerTypes "github.com/docker/docker/api/types"
units "github.com/docker/go-units" units "github.com/docker/go-units"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/pwd/types"
) )
type collectStatsTask struct { type collectStatsTask struct {
@@ -22,14 +23,14 @@ type collectStatsTask struct {
docker docker.DockerApi docker docker.DockerApi
} }
func (c collectStatsTask) Run(i *Instance) error { func (c collectStatsTask) Run(i *types.Instance) error {
reader, err := c.docker.GetContainerStats(i.Name) reader, err := c.docker.GetContainerStats(i.Name)
if err != nil { if err != nil {
log.Println("Error while trying to collect instance stats", err) log.Println("Error while trying to collect instance stats", err)
return err return err
} }
dec := json.NewDecoder(reader) dec := json.NewDecoder(reader)
var v *types.StatsJSON var v *dockerTypes.StatsJSON
e := dec.Decode(&v) e := dec.Decode(&v)
if e != nil { if e != nil {
log.Println("Error while trying to collect instance stats", e) log.Println("Error while trying to collect instance stats", e)
@@ -53,7 +54,7 @@ func (c collectStatsTask) Run(i *Instance) error {
return nil return nil
} }
func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 { func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *dockerTypes.StatsJSON) float64 {
var ( var (
cpuPercent = 0.0 cpuPercent = 0.0
// calculate the change for the cpu usage of the container in between readings // calculate the change for the cpu usage of the container in between readings

View File

@@ -1,19 +1,17 @@
package pwd package pwd
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"log" "log"
"net"
"net/http" "net/http"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time" "time"
"github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/config"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/pwd/types"
"golang.org/x/text/encoding" "golang.org/x/text/encoding"
) )
@@ -29,35 +27,6 @@ func (s *sessionWriter) Write(p []byte) (n int, err error) {
return len(p), nil return len(p), nil
} }
type UInt16Slice []uint16
func (p UInt16Slice) Len() int { return len(p) }
func (p UInt16Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p UInt16Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type Instance struct {
Image string `json:"image"`
Name string `json:"name"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
IsManager *bool `json:"is_manager"`
Mem string `json:"mem"`
Cpu string `json:"cpu"`
Alias string `json:"alias"`
ServerCert []byte `json:"server_cert"`
ServerKey []byte `json:"server_key"`
CACert []byte `json:"ca_cert"`
Cert []byte `json:"cert"`
Key []byte `json:"key"`
IsDockerHost bool `json:"is_docker_host"`
session *Session `json:"-"`
conn net.Conn `json:"-"`
ctx context.Context `json:"-"`
docker docker.DockerApi `json:"-"`
tempPorts []uint16 `json:"-"`
Ports UInt16Slice
rw sync.Mutex
}
type InstanceConfig struct { type InstanceConfig struct {
ImageName string ImageName string
Alias string Alias string
@@ -70,32 +39,12 @@ type InstanceConfig struct {
Host string Host string
} }
func (i *Instance) setUsedPort(port uint16) { func (p *pwd) InstanceResizeTerminal(instance *types.Instance, rows, cols uint) error {
i.rw.Lock()
defer i.rw.Unlock()
for _, p := range i.tempPorts {
if p == port {
return
}
}
i.tempPorts = append(i.tempPorts, port)
}
func (i *Instance) IsConnected() bool {
return i.conn != nil
}
func (i *Instance) SetSession(s *Session) {
i.session = s
}
func (p *pwd) InstanceResizeTerminal(instance *Instance, rows, cols uint) error {
defer observeAction("InstanceResizeTerminal", time.Now()) defer observeAction("InstanceResizeTerminal", time.Now())
return p.docker.ContainerResize(instance.Name, rows, cols) return p.docker.ContainerResize(instance.Name, rows, cols)
} }
func (p *pwd) InstanceAttachTerminal(instance *Instance) error { func (p *pwd) InstanceAttachTerminal(instance *types.Instance) error {
conn, err := p.docker.CreateAttachConnection(instance.Name) conn, err := p.docker.CreateAttachConnection(instance.Name)
if err != nil { if err != nil {
@@ -103,14 +52,14 @@ func (p *pwd) InstanceAttachTerminal(instance *Instance) error {
} }
encoder := encoding.Replacement.NewEncoder() encoder := encoding.Replacement.NewEncoder()
sw := &sessionWriter{sessionId: instance.session.Id, instanceName: instance.Name, broadcast: p.broadcast} sw := &sessionWriter{sessionId: instance.Session.Id, instanceName: instance.Name, broadcast: p.broadcast}
instance.conn = conn instance.Terminal = conn
io.Copy(encoder.Writer(sw), conn) io.Copy(encoder.Writer(sw), conn)
return nil return nil
} }
func (p *pwd) InstanceUploadFromUrl(instance *Instance, url string) error { func (p *pwd) InstanceUploadFromUrl(instance *types.Instance, url string) error {
defer observeAction("InstanceUploadFromUrl", time.Now()) defer observeAction("InstanceUploadFromUrl", time.Now())
log.Printf("Downloading file [%s]\n", url) log.Printf("Downloading file [%s]\n", url)
resp, err := http.Get(url) resp, err := http.Get(url)
@@ -133,7 +82,7 @@ func (p *pwd) InstanceUploadFromUrl(instance *Instance, url string) error {
return nil return nil
} }
func (p *pwd) InstanceUploadFromReader(instance *Instance, fileName string, reader io.Reader) error { func (p *pwd) InstanceUploadFromReader(instance *types.Instance, fileName string, reader io.Reader) error {
defer observeAction("InstanceUploadFromReader", time.Now()) defer observeAction("InstanceUploadFromReader", time.Now())
copyErr := p.docker.CopyToContainer(instance.Name, "/var/run/pwd/uploads", fileName, reader) copyErr := p.docker.CopyToContainer(instance.Name, "/var/run/pwd/uploads", fileName, reader)
@@ -145,55 +94,44 @@ func (p *pwd) InstanceUploadFromReader(instance *Instance, fileName string, read
return nil return nil
} }
func (p *pwd) InstanceGet(session *Session, name string) *Instance { func (p *pwd) InstanceGet(session *types.Session, name string) *types.Instance {
defer observeAction("InstanceGet", time.Now()) defer observeAction("InstanceGet", time.Now())
return session.Instances[name] return session.Instances[name]
} }
func (p *pwd) InstanceFindByIP(ip string) *Instance { func (p *pwd) InstanceFindByIP(ip string) *types.Instance {
defer observeAction("InstanceFindByIP", time.Now()) defer observeAction("InstanceFindByIP", time.Now())
for _, s := range sessions { i, err := p.storage.InstanceFindByIP(ip)
for _, i := range s.Instances { if err != nil {
if i.IP == ip { return nil
return i
}
}
} }
return nil
return i
} }
func (p *pwd) InstanceFindByIPAndSession(sessionPrefix, ip string) *Instance { func (p *pwd) InstanceFindByIPAndSession(sessionPrefix, ip string) *types.Instance {
defer observeAction("InstanceFindByIPAndSession", time.Now()) defer observeAction("InstanceFindByIPAndSession", time.Now())
for id, s := range sessions { i, err := p.storage.InstanceFindByIPAndSession(sessionPrefix, ip)
if strings.HasPrefix(id, sessionPrefix) { if err != nil {
for _, i := range s.Instances { return nil
if i.IP == ip {
return i
}
}
}
} }
return nil
return i
} }
func (p *pwd) InstanceFindByAlias(sessionPrefix, alias string) *Instance { func (p *pwd) InstanceFindByAlias(sessionPrefix, alias string) *types.Instance {
defer observeAction("InstanceFindByAlias", time.Now()) defer observeAction("InstanceFindByAlias", time.Now())
for id, s := range sessions { i, err := p.storage.InstanceFindByAlias(sessionPrefix, alias)
if strings.HasPrefix(id, sessionPrefix) { if err != nil {
for _, i := range s.Instances { return nil
if i.Alias == alias {
return i
}
}
}
} }
return nil return i
} }
func (p *pwd) InstanceDelete(session *Session, instance *Instance) error { func (p *pwd) InstanceDelete(session *types.Session, instance *types.Instance) error {
defer observeAction("InstanceDelete", time.Now()) defer observeAction("InstanceDelete", time.Now())
if instance.conn != nil { if instance.Terminal != nil {
instance.conn.Close() instance.Terminal.Close()
} }
err := p.docker.DeleteContainer(instance.Name) err := p.docker.DeleteContainer(instance.Name)
if err != nil && !strings.Contains(err.Error(), "No such container") { if err != nil && !strings.Contains(err.Error(), "No such container") {
@@ -204,16 +142,16 @@ func (p *pwd) InstanceDelete(session *Session, instance *Instance) error {
p.broadcast.BroadcastTo(session.Id, "delete instance", instance.Name) p.broadcast.BroadcastTo(session.Id, "delete instance", instance.Name)
delete(session.Instances, instance.Name) delete(session.Instances, instance.Name)
if err := p.storage.Save(); err != nil { if err := p.storage.SessionPut(session); err != nil {
return err return err
} }
setGauges() p.setGauges()
return nil return nil
} }
func (p *pwd) checkHostnameExists(session *Session, hostname string) bool { func (p *pwd) checkHostnameExists(session *types.Session, hostname string) bool {
containerName := fmt.Sprintf("%s_%s", session.Id[:8], hostname) containerName := fmt.Sprintf("%s_%s", session.Id[:8], hostname)
exists := false exists := false
for _, instance := range session.Instances { for _, instance := range session.Instances {
@@ -225,10 +163,10 @@ func (p *pwd) checkHostnameExists(session *Session, hostname string) bool {
return exists return exists
} }
func (p *pwd) InstanceNew(session *Session, conf InstanceConfig) (*Instance, error) { func (p *pwd) InstanceNew(session *types.Session, conf InstanceConfig) (*types.Instance, error) {
defer observeAction("InstanceNew", time.Now()) defer observeAction("InstanceNew", time.Now())
session.rw.Lock() session.Lock()
defer session.rw.Unlock() defer session.Unlock()
if conf.ImageName == "" { if conf.ImageName == "" {
conf.ImageName = config.GetDindImageName() conf.ImageName = config.GetDindImageName()
@@ -273,7 +211,7 @@ func (p *pwd) InstanceNew(session *Session, conf InstanceConfig) (*Instance, err
return nil, err return nil, err
} }
instance := &Instance{} instance := &types.Instance{}
instance.Image = opts.Image instance.Image = opts.Image
instance.IP = ip instance.IP = ip
instance.Name = containerName instance.Name = containerName
@@ -284,33 +222,33 @@ func (p *pwd) InstanceNew(session *Session, conf InstanceConfig) (*Instance, err
instance.ServerCert = conf.ServerCert instance.ServerCert = conf.ServerCert
instance.ServerKey = conf.ServerKey instance.ServerKey = conf.ServerKey
instance.CACert = conf.CACert instance.CACert = conf.CACert
instance.session = session instance.Session = session
// For now this condition holds through. In the future we might need a more complex logic. // For now this condition holds through. In the future we might need a more complex logic.
instance.IsDockerHost = opts.Privileged instance.IsDockerHost = opts.Privileged
if session.Instances == nil { if session.Instances == nil {
session.Instances = make(map[string]*Instance) session.Instances = make(map[string]*types.Instance)
} }
session.Instances[instance.Name] = instance session.Instances[instance.Name] = instance
go p.InstanceAttachTerminal(instance) go p.InstanceAttachTerminal(instance)
err = p.storage.Save() err = p.storage.SessionPut(session)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.broadcast.BroadcastTo(session.Id, "new instance", instance.Name, instance.IP, instance.Hostname) p.broadcast.BroadcastTo(session.Id, "new instance", instance.Name, instance.IP, instance.Hostname)
setGauges() p.setGauges()
return instance, nil return instance, nil
} }
func (p *pwd) InstanceWriteToTerminal(instance *Instance, data string) { func (p *pwd) InstanceWriteToTerminal(instance *types.Instance, data string) {
defer observeAction("InstanceWriteToTerminal", time.Now()) defer observeAction("InstanceWriteToTerminal", time.Now())
if instance != nil && instance.conn != nil && len(data) > 0 { if instance != nil && instance.Terminal != nil && len(data) > 0 {
instance.conn.Write([]byte(data)) instance.Terminal.Write([]byte(data))
} }
} }
@@ -325,7 +263,7 @@ func (p *pwd) InstanceAllowedImages() []string {
} }
func (p *pwd) InstanceExec(instance *Instance, cmd []string) (int, error) { func (p *pwd) InstanceExec(instance *types.Instance, cmd []string) (int, error) {
defer observeAction("InstanceExec", time.Now()) defer observeAction("InstanceExec", time.Now())
return p.docker.Exec(instance.Name, cmd) return p.docker.Exec(instance.Name, cmd)
} }

View File

@@ -8,6 +8,7 @@ import (
"github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/config"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/pwd/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@@ -31,7 +32,7 @@ func TestInstanceResizeTerminal(t *testing.T) {
p := NewPWD(docker, tasks, broadcast, storage) p := NewPWD(docker, tasks, broadcast, storage)
err := p.InstanceResizeTerminal(&Instance{Name: "foobar"}, 24, 80) err := p.InstanceResizeTerminal(&types.Instance{Name: "foobar"}, 24, 80)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "foobar", resizedInstanceName) assert.Equal(t, "foobar", resizedInstanceName)
@@ -61,14 +62,14 @@ func TestInstanceNew(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
expectedInstance := Instance{ expectedInstance := types.Instance{
Name: fmt.Sprintf("%s_node1", session.Id[:8]), Name: fmt.Sprintf("%s_node1", session.Id[:8]),
Hostname: "node1", Hostname: "node1",
IP: "10.0.0.1", IP: "10.0.0.1",
Alias: "", Alias: "",
Image: config.GetDindImageName(), Image: config.GetDindImageName(),
IsDockerHost: true, IsDockerHost: true,
session: session, Session: session,
} }
assert.Equal(t, expectedInstance, *instance) assert.Equal(t, expectedInstance, *instance)
@@ -107,8 +108,8 @@ func TestInstanceNew_Concurrency(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
var instance1 *Instance var instance1 *types.Instance
var instance2 *Instance var instance2 *types.Instance
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(2) wg.Add(2)
@@ -152,14 +153,14 @@ func TestInstanceNew_WithNotAllowedImage(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
expectedInstance := Instance{ expectedInstance := types.Instance{
Name: fmt.Sprintf("%s_node1", session.Id[:8]), Name: fmt.Sprintf("%s_node1", session.Id[:8]),
Hostname: "node1", Hostname: "node1",
IP: "10.0.0.1", IP: "10.0.0.1",
Alias: "", Alias: "",
Image: "redis", Image: "redis",
IsDockerHost: false, IsDockerHost: false,
session: session, Session: session,
} }
assert.Equal(t, expectedInstance, *instance) assert.Equal(t, expectedInstance, *instance)
@@ -200,14 +201,14 @@ func TestInstanceNew_WithCustomHostname(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
expectedInstance := Instance{ expectedInstance := types.Instance{
Name: fmt.Sprintf("%s_redis-master", session.Id[:8]), Name: fmt.Sprintf("%s_redis-master", session.Id[:8]),
Hostname: "redis-master", Hostname: "redis-master",
IP: "10.0.0.1", IP: "10.0.0.1",
Alias: "", Alias: "",
Image: "redis", Image: "redis",
IsDockerHost: false, IsDockerHost: false,
session: session, Session: session,
} }
assert.Equal(t, expectedInstance, *instance) assert.Equal(t, expectedInstance, *instance)

View File

@@ -2,10 +2,11 @@ package pwd
import ( import (
"io" "io"
"sync"
"time" "time"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/pwd/types"
"github.com/play-with-docker/play-with-docker/storage"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@@ -34,67 +35,60 @@ func observeAction(action string, start time.Time) {
latencyHistogramVec.WithLabelValues(action).Observe(float64(time.Since(start).Nanoseconds()) / 1000000) latencyHistogramVec.WithLabelValues(action).Observe(float64(time.Since(start).Nanoseconds()) / 1000000)
} }
var sessions map[string]*Session
var sessionsMutex sync.Mutex
func init() { func init() {
prometheus.MustRegister(sessionsGauge) prometheus.MustRegister(sessionsGauge)
prometheus.MustRegister(clientsGauge) prometheus.MustRegister(clientsGauge)
prometheus.MustRegister(instancesGauge) prometheus.MustRegister(instancesGauge)
prometheus.MustRegister(latencyHistogramVec) prometheus.MustRegister(latencyHistogramVec)
sessions = make(map[string]*Session)
} }
type pwd struct { type pwd struct {
docker docker.DockerApi docker docker.DockerApi
tasks SchedulerApi tasks SchedulerApi
broadcast BroadcastApi broadcast BroadcastApi
storage StorageApi storage storage.StorageApi
} }
type PWDApi interface { type PWDApi interface {
SessionNew(duration time.Duration, stack string, stackName, imageName string) (*Session, error) SessionNew(duration time.Duration, stack string, stackName, imageName string) (*types.Session, error)
SessionClose(session *Session) error SessionClose(session *types.Session) error
SessionGetSmallestViewPort(session *Session) ViewPort SessionGetSmallestViewPort(session *types.Session) types.ViewPort
SessionDeployStack(session *Session) error SessionDeployStack(session *types.Session) error
SessionGet(id string) *Session SessionGet(id string) *types.Session
SessionLoadAndPrepare() error SessionSetup(session *types.Session, conf SessionSetupConf) error
SessionSetup(session *Session, conf SessionSetupConf) error
InstanceNew(session *Session, conf InstanceConfig) (*Instance, error) InstanceNew(session *types.Session, conf InstanceConfig) (*types.Instance, error)
InstanceResizeTerminal(instance *Instance, cols, rows uint) error InstanceResizeTerminal(instance *types.Instance, cols, rows uint) error
InstanceAttachTerminal(instance *Instance) error InstanceAttachTerminal(instance *types.Instance) error
InstanceUploadFromUrl(instance *Instance, url string) error InstanceUploadFromUrl(instance *types.Instance, url string) error
InstanceUploadFromReader(instance *Instance, filename string, reader io.Reader) error InstanceUploadFromReader(instance *types.Instance, filename string, reader io.Reader) error
InstanceGet(session *Session, name string) *Instance InstanceGet(session *types.Session, name string) *types.Instance
InstanceFindByIP(ip string) *Instance InstanceFindByIP(ip string) *types.Instance
InstanceFindByAlias(sessionPrefix, alias string) *Instance InstanceFindByAlias(sessionPrefix, alias string) *types.Instance
InstanceFindByIPAndSession(sessionPrefix, ip string) *Instance InstanceFindByIPAndSession(sessionPrefix, ip string) *types.Instance
InstanceDelete(session *Session, instance *Instance) error InstanceDelete(session *types.Session, instance *types.Instance) error
InstanceWriteToTerminal(instance *Instance, data string) InstanceWriteToTerminal(instance *types.Instance, data string)
InstanceAllowedImages() []string InstanceAllowedImages() []string
InstanceExec(instance *Instance, cmd []string) (int, error) InstanceExec(instance *types.Instance, cmd []string) (int, error)
ClientNew(id string, session *Session) *Client ClientNew(id string, session *types.Session) *types.Client
ClientResizeViewPort(client *Client, cols, rows uint) ClientResizeViewPort(client *types.Client, cols, rows uint)
ClientClose(client *Client) ClientClose(client *types.Client)
} }
func NewPWD(d docker.DockerApi, t SchedulerApi, b BroadcastApi, s StorageApi) *pwd { func NewPWD(d docker.DockerApi, t SchedulerApi, b BroadcastApi, s storage.StorageApi) *pwd {
return &pwd{docker: d, tasks: t, broadcast: b, storage: s} return &pwd{docker: d, tasks: t, broadcast: b, storage: s}
} }
func setGauges() { func (p *pwd) setGauges() {
var ins float64 s, _ := p.storage.SessionCount()
var cli float64 ses := float64(s)
i, _ := p.storage.InstanceCount()
for _, s := range sessions { ins := float64(i)
ins += float64(len(s.Instances)) c, _ := p.storage.ClientCount()
cli += float64(len(s.clients)) cli := float64(c)
}
clientsGauge.Set(cli) clientsGauge.Set(cli)
instancesGauge.Set(ins) instancesGauge.Set(ins)
sessionsGauge.Set(float64(len(sessions))) sessionsGauge.Set(ses)
} }

View File

@@ -11,6 +11,7 @@ import (
"github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/config"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/pwd/types"
"github.com/twinj/uuid" "github.com/twinj/uuid"
) )
@@ -35,33 +36,12 @@ type SessionSetupInstanceConf struct {
IsSwarmWorker bool `json:"is_swarm_worker"` IsSwarmWorker bool `json:"is_swarm_worker"`
} }
type Session struct { func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName string) (*types.Session, error) {
rw sync.Mutex
Id string `json:"id"`
Instances map[string]*Instance `json:"instances"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
PwdIpAddress string `json:"pwd_ip_address"`
Ready bool `json:"ready"`
Stack string `json:"stack"`
StackName string `json:"stack_name"`
ImageName string `json:"image_name"`
Host string `json:"host"`
closingTimer *time.Timer `json:"-"`
scheduled bool `json:"-"`
clients []*Client `json:"-"`
ticker *time.Ticker `json:"-"`
}
func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName string) (*Session, error) {
defer observeAction("SessionNew", time.Now()) defer observeAction("SessionNew", time.Now())
sessionsMutex.Lock() s := &types.Session{}
defer sessionsMutex.Unlock()
s := &Session{}
s.Id = uuid.NewV4().String() s.Id = uuid.NewV4().String()
s.Instances = map[string]*Instance{} s.Instances = map[string]*types.Instance{}
s.CreatedAt = time.Now() s.CreatedAt = time.Now()
s.ExpiresAt = s.CreatedAt.Add(duration) s.ExpiresAt = s.CreatedAt.Add(duration)
s.Ready = true s.Ready = true
@@ -89,24 +69,24 @@ func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName str
return nil, err return nil, err
} }
sessions[s.Id] = s if err := p.storage.SessionPut(s); err != nil {
if err := p.storage.Save(); err != nil {
log.Println(err) log.Println(err)
return nil, err return nil, err
} }
setGauges() p.setGauges()
return s, nil return s, nil
} }
func (p *pwd) SessionClose(s *Session) error { func (p *pwd) SessionClose(s *types.Session) error {
s.rw.Lock() defer observeAction("SessionClose", time.Now())
defer s.rw.Unlock()
s.Lock()
defer s.Unlock()
s.StopTicker()
if s.ticker != nil {
s.ticker.Stop()
}
p.broadcast.BroadcastTo(s.Id, "session end") p.broadcast.BroadcastTo(s.Id, "session end")
p.broadcast.BroadcastTo(s.Id, "disconnect") p.broadcast.BroadcastTo(s.Id, "disconnect")
log.Printf("Starting clean up of session [%s]\n", s.Id) log.Printf("Starting clean up of session [%s]\n", s.Id)
@@ -131,32 +111,35 @@ func (p *pwd) SessionClose(s *Session) error {
return err return err
} }
} }
delete(sessions, s.Id)
// We store sessions as soon as we delete one err := p.storage.SessionDelete(s.Id)
if err := p.storage.Save(); err != nil { if err != nil {
return err return err
} }
setGauges()
log.Printf("Cleaned up session [%s]\n", s.Id) log.Printf("Cleaned up session [%s]\n", s.Id)
p.setGauges()
return nil return nil
} }
func (p *pwd) SessionGetSmallestViewPort(s *Session) ViewPort { func (p *pwd) SessionGetSmallestViewPort(s *types.Session) types.ViewPort {
minRows := s.clients[0].viewPort.Rows defer observeAction("SessionGetSmallestViewPort", time.Now())
minCols := s.clients[0].viewPort.Cols
for _, c := range s.clients { minRows := s.Clients[0].ViewPort.Rows
minRows = uint(math.Min(float64(minRows), float64(c.viewPort.Rows))) minCols := s.Clients[0].ViewPort.Cols
minCols = uint(math.Min(float64(minCols), float64(c.viewPort.Cols)))
for _, c := range s.Clients {
minRows = uint(math.Min(float64(minRows), float64(c.ViewPort.Rows)))
minCols = uint(math.Min(float64(minCols), float64(c.ViewPort.Cols)))
} }
return ViewPort{Rows: minRows, Cols: minCols} return types.ViewPort{Rows: minRows, Cols: minCols}
} }
func (p *pwd) SessionDeployStack(s *Session) error { func (p *pwd) SessionDeployStack(s *types.Session) error {
defer observeAction("SessionDeployStack", time.Now()) defer observeAction("SessionDeployStack", time.Now())
if s.Ready { if s.Ready {
// a stack was already deployed on this session, just ignore // a stack was already deployed on this session, just ignore
return nil return nil
@@ -189,60 +172,28 @@ func (p *pwd) SessionDeployStack(s *Session) error {
log.Printf("Stack execution finished with code %d\n", code) log.Printf("Stack execution finished with code %d\n", code)
s.Ready = true s.Ready = true
p.broadcast.BroadcastTo(s.Id, "session ready", true) p.broadcast.BroadcastTo(s.Id, "session ready", true)
if err := p.storage.Save(); err != nil { if err := p.storage.SessionPut(s); err != nil {
return err return err
} }
return nil return nil
} }
func (p *pwd) SessionGet(sessionId string) *Session { func (p *pwd) SessionGet(sessionId string) *types.Session {
defer observeAction("SessionGet", time.Now()) defer observeAction("SessionGet", time.Now())
s := sessions[sessionId]
s, _ := p.storage.SessionGet(sessionId)
if err := p.prepareSession(s); err != nil {
log.Println(err)
return nil
}
return s return s
} }
func (p *pwd) SessionLoadAndPrepare() error { func (p *pwd) SessionSetup(session *types.Session, conf SessionSetupConf) error {
defer observeAction("SessionLoadAndPrepare", time.Now())
err := p.storage.Load()
if err != nil {
return err
}
wg := sync.WaitGroup{}
for _, s := range sessions {
// Connect PWD daemon to the new network
if s.PwdIpAddress == "" {
log.Printf("Cannot load store session [%s] as they don't have the pwd ip address stored with them\n", s.Id)
continue
}
wg.Add(1)
go func(s *Session) {
s.rw.Lock()
defer s.rw.Unlock()
defer wg.Done()
err := p.prepareSession(s)
if err != nil {
log.Println(err)
}
for _, i := range s.Instances {
// wire the session back to the instance
i.session = s
go p.InstanceAttachTerminal(i)
}
}(s)
}
wg.Wait()
setGauges()
return nil
}
func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
defer observeAction("SessionSetup", time.Now()) defer observeAction("SessionSetup", time.Now())
var tokens *docker.SwarmTokens = nil var tokens *docker.SwarmTokens = nil
var firstSwarmManager *Instance = nil var firstSwarmManager *types.Instance = nil
// first look for a swarm manager and create it // first look for a swarm manager and create it
for _, conf := range conf.Instances { for _, conf := range conf.Instances {
@@ -256,14 +207,14 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
if err != nil { if err != nil {
return err return err
} }
if i.docker == nil { if i.Docker == nil {
dock, err := p.docker.New(i.IP, i.Cert, i.Key) dock, err := p.docker.New(i.IP, i.Cert, i.Key)
if err != nil { if err != nil {
return err return err
} }
i.docker = dock i.Docker = dock
} }
tkns, err := i.docker.SwarmInit() tkns, err := i.Docker.SwarmInit()
if err != nil { if err != nil {
return err return err
} }
@@ -292,13 +243,13 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
} }
if c.IsSwarmManager || c.IsSwarmWorker { if c.IsSwarmManager || c.IsSwarmWorker {
// check if we have connection to the daemon, if not, create it // check if we have connection to the daemon, if not, create it
if i.docker == nil { if i.Docker == nil {
dock, err := p.docker.New(i.IP, i.Cert, i.Key) dock, err := p.docker.New(i.IP, i.Cert, i.Key)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
i.docker = dock i.Docker = dock
} }
} }
@@ -306,7 +257,7 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
if c.IsSwarmManager { if c.IsSwarmManager {
// this is a swarm manager // this is a swarm manager
// cluster has already been initiated, join as manager // cluster has already been initiated, join as manager
err := i.docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager) err := i.Docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
@@ -314,7 +265,7 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
} }
if c.IsSwarmWorker { if c.IsSwarmWorker {
// this is a swarm worker // this is a swarm worker
err := i.docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Worker) err := i.Docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Worker)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
@@ -332,7 +283,14 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
// This function should be called any time a session needs to be prepared: // This function should be called any time a session needs to be prepared:
// 1. Like when it is created // 1. Like when it is created
// 2. When it was loaded from storage // 2. When it was loaded from storage
func (p *pwd) prepareSession(session *Session) error { func (p *pwd) prepareSession(session *types.Session) error {
session.Lock()
defer session.Unlock()
if session.IsPrepared() {
return nil
}
p.scheduleSessionClose(session) p.scheduleSessionClose(session)
// Connect PWD daemon to the new network // Connect PWD daemon to the new network
@@ -343,17 +301,24 @@ func (p *pwd) prepareSession(session *Session) error {
// Schedule periodic tasks // Schedule periodic tasks
p.tasks.Schedule(session) p.tasks.Schedule(session)
for _, i := range session.Instances {
// wire the session back to the instance
i.Session = session
go p.InstanceAttachTerminal(i)
}
session.SetPrepared()
return nil return nil
} }
func (p *pwd) scheduleSessionClose(s *Session) { func (p *pwd) scheduleSessionClose(s *types.Session) {
timeLeft := s.ExpiresAt.Sub(time.Now()) timeLeft := s.ExpiresAt.Sub(time.Now())
s.closingTimer = time.AfterFunc(timeLeft, func() { s.SetClosingTimer(time.AfterFunc(timeLeft, func() {
p.SessionClose(s) p.SessionClose(s)
}) }))
} }
func (p *pwd) connectToNetwork(s *Session) error { func (p *pwd) connectToNetwork(s *types.Session) error {
ip, err := p.docker.ConnectNetwork(config.PWDContainerName, s.Id, s.PwdIpAddress) ip, err := p.docker.ConnectNetwork(config.PWDContainerName, s.Id, s.PwdIpAddress)
if err != nil { if err != nil {
log.Println("ERROR NETWORKING") log.Println("ERROR NETWORKING")

View File

@@ -2,24 +2,21 @@ package pwd
import ( import (
"fmt" "fmt"
"net"
"sync"
"testing" "testing"
"time" "time"
"github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/config"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/pwd/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestSessionNew(t *testing.T) { func TestSessionNew(t *testing.T) {
sessions = map[string]*Session{}
config.PWDContainerName = "pwd" config.PWDContainerName = "pwd"
var connectContainerName, connectNetworkName, connectIP string var connectContainerName, connectNetworkName, connectIP string
createdNetworkId := "" createdNetworkId := ""
saveCalled := false saveCalled := false
expectedSessions := map[string]*Session{} expectedSessions := map[string]*types.Session{}
docker := &mockDocker{} docker := &mockDocker{}
docker.createNetwork = func(id string) error { docker.createNetwork = func(id string) error {
@@ -33,15 +30,15 @@ func TestSessionNew(t *testing.T) {
return "10.0.0.1", nil return "10.0.0.1", nil
} }
var scheduledSession *Session var scheduledSession *types.Session
tasks := &mockTasks{} tasks := &mockTasks{}
tasks.schedule = func(s *Session) { tasks.schedule = func(s *types.Session) {
scheduledSession = s scheduledSession = s
} }
broadcast := &mockBroadcast{} broadcast := &mockBroadcast{}
storage := &mockStorage{} storage := &mockStorage{}
storage.save = func() error { storage.sessionPut = func(s *types.Session) error {
saveCalled = true saveCalled = true
return nil return nil
} }
@@ -72,7 +69,7 @@ func TestSessionNew(t *testing.T) {
assert.Equal(t, "imageName", s.ImageName) assert.Equal(t, "imageName", s.ImageName)
assert.False(t, s.Ready) assert.False(t, s.Ready)
assert.NotNil(t, s.closingTimer) assert.NotNil(t, s.ClosingTimer())
assert.Equal(t, config.PWDContainerName, connectContainerName) assert.Equal(t, config.PWDContainerName, connectContainerName)
assert.Equal(t, s.Id, connectNetworkName) assert.Equal(t, s.Id, connectNetworkName)
@@ -82,7 +79,6 @@ func TestSessionNew(t *testing.T) {
assert.Equal(t, s, scheduledSession) assert.Equal(t, s, scheduledSession)
assert.Equal(t, expectedSessions, sessions)
assert.True(t, saveCalled) assert.True(t, saveCalled)
} }
@@ -208,72 +204,72 @@ func TestSessionSetup(t *testing.T) {
manager1 := fmt.Sprintf("%s_manager1", s.Id[:8]) manager1 := fmt.Sprintf("%s_manager1", s.Id[:8])
manager1Received := *s.Instances[manager1] manager1Received := *s.Instances[manager1]
assert.Equal(t, Instance{ assert.Equal(t, types.Instance{
Name: manager1, Name: manager1,
Image: "franela/dind", Image: "franela/dind",
Hostname: "manager1", Hostname: "manager1",
IP: "10.0.0.1", IP: "10.0.0.1",
Alias: "", Alias: "",
IsDockerHost: true, IsDockerHost: true,
session: s, Session: s,
conn: manager1Received.conn, Terminal: manager1Received.Terminal,
docker: manager1Received.docker, Docker: manager1Received.Docker,
}, manager1Received) }, manager1Received)
manager2 := fmt.Sprintf("%s_manager2", s.Id[:8]) manager2 := fmt.Sprintf("%s_manager2", s.Id[:8])
manager2Received := *s.Instances[manager2] manager2Received := *s.Instances[manager2]
assert.Equal(t, Instance{ assert.Equal(t, types.Instance{
Name: manager2, Name: manager2,
Image: "franela/dind", Image: "franela/dind",
Hostname: "manager2", Hostname: "manager2",
IP: "10.0.0.2", IP: "10.0.0.2",
Alias: "", Alias: "",
IsDockerHost: true, IsDockerHost: true,
session: s, Session: s,
conn: manager2Received.conn, Terminal: manager2Received.Terminal,
docker: manager2Received.docker, Docker: manager2Received.Docker,
}, manager2Received) }, manager2Received)
manager3 := fmt.Sprintf("%s_manager3", s.Id[:8]) manager3 := fmt.Sprintf("%s_manager3", s.Id[:8])
manager3Received := *s.Instances[manager3] manager3Received := *s.Instances[manager3]
assert.Equal(t, Instance{ assert.Equal(t, types.Instance{
Name: manager3, Name: manager3,
Image: "franela/dind:overlay2-dev", Image: "franela/dind:overlay2-dev",
Hostname: "manager3", Hostname: "manager3",
IP: "10.0.0.3", IP: "10.0.0.3",
Alias: "", Alias: "",
IsDockerHost: true, IsDockerHost: true,
session: s, Session: s,
conn: manager3Received.conn, Terminal: manager3Received.Terminal,
docker: manager3Received.docker, Docker: manager3Received.Docker,
}, manager3Received) }, manager3Received)
worker1 := fmt.Sprintf("%s_worker1", s.Id[:8]) worker1 := fmt.Sprintf("%s_worker1", s.Id[:8])
worker1Received := *s.Instances[worker1] worker1Received := *s.Instances[worker1]
assert.Equal(t, Instance{ assert.Equal(t, types.Instance{
Name: worker1, Name: worker1,
Image: "franela/dind", Image: "franela/dind",
Hostname: "worker1", Hostname: "worker1",
IP: "10.0.0.4", IP: "10.0.0.4",
Alias: "", Alias: "",
IsDockerHost: true, IsDockerHost: true,
session: s, Session: s,
conn: worker1Received.conn, Terminal: worker1Received.Terminal,
docker: worker1Received.docker, Docker: worker1Received.Docker,
}, worker1Received) }, worker1Received)
other := fmt.Sprintf("%s_other", s.Id[:8]) other := fmt.Sprintf("%s_other", s.Id[:8])
otherReceived := *s.Instances[other] otherReceived := *s.Instances[other]
assert.Equal(t, Instance{ assert.Equal(t, types.Instance{
Name: other, Name: other,
Image: "franela/dind", Image: "franela/dind",
Hostname: "other", Hostname: "other",
IP: "10.0.0.5", IP: "10.0.0.5",
Alias: "", Alias: "",
IsDockerHost: true, IsDockerHost: true,
session: s, Session: s,
conn: otherReceived.conn, Terminal: otherReceived.Terminal,
docker: otherReceived.docker, Docker: otherReceived.Docker,
}, otherReceived) }, otherReceived)
assert.True(t, swarmInitOnMaster1) assert.True(t, swarmInitOnMaster1)
@@ -281,100 +277,3 @@ func TestSessionSetup(t *testing.T) {
assert.True(t, manager3JoinedHasManager) assert.True(t, manager3JoinedHasManager)
assert.True(t, worker1JoinedHasWorker) assert.True(t, worker1JoinedHasWorker)
} }
func TestSessionLoadAndPrepare(t *testing.T) {
config.PWDContainerName = "pwd"
lock := sync.Mutex{}
var s1NetworkConnect []string
var s2NetworkConnect []string
wg := sync.WaitGroup{}
wg.Add(3)
connectedInstances := []string{}
sessions = map[string]*Session{}
i1 := &Instance{
Image: "dind",
Name: "session1_i1",
Hostname: "i1",
IP: "10.0.0.10",
IsDockerHost: true,
}
i2 := &Instance{
Image: "dind",
Name: "session1_i2",
Hostname: "i1",
IP: "10.0.0.11",
IsDockerHost: true,
}
i3 := &Instance{
Image: "dind",
Name: "session1_i3",
Hostname: "i1",
IP: "10.0.0.12",
IsDockerHost: true,
}
s1 := &Session{
Id: "session1",
Instances: map[string]*Instance{"session1_i1": i1},
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(time.Hour),
PwdIpAddress: "10.0.0.1",
Ready: true,
Stack: "",
StackName: "",
}
s2 := &Session{
Id: "session2",
Instances: map[string]*Instance{"session1_i2": i2, "session1_i3": i3},
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(time.Hour),
PwdIpAddress: "10.0.0.2",
Ready: true,
Stack: "",
StackName: "",
}
dock := &mockDocker{}
dock.createAttachConnection = func(instanceName string) (net.Conn, error) {
lock.Lock()
defer lock.Unlock()
connectedInstances = append(connectedInstances, instanceName)
wg.Done()
return &mockConn{}, nil
}
dock.connectNetwork = func(container, network, ip string) (string, error) {
if s1.Id == network {
s1NetworkConnect = []string{container, network, ip}
} else if s2.Id == network {
s2NetworkConnect = []string{container, network, ip}
}
return ip, nil
}
tasks := &mockTasks{}
tasks.schedule = func(s *Session) {
s.ticker = time.NewTicker(1 * time.Second)
}
broadcast := &mockBroadcast{}
storage := &mockStorage{}
storage.load = func() error {
sessions = map[string]*Session{"session1": s1, "session2": s2}
return nil
}
p := NewPWD(dock, tasks, broadcast, storage)
err := p.SessionLoadAndPrepare()
assert.Nil(t, err)
assert.Len(t, sessions, 2)
assert.NotNil(t, s1.closingTimer)
assert.NotNil(t, s2.closingTimer)
assert.NotNil(t, s1.ticker)
assert.NotNil(t, s2.ticker)
assert.Equal(t, []string{"pwd", s1.Id, s1.PwdIpAddress}, s1NetworkConnect)
assert.Equal(t, []string{"pwd", s2.Id, s2.PwdIpAddress}, s2NetworkConnect)
wg.Wait()
assert.Subset(t, connectedInstances, []string{i1.Name, i2.Name, i3.Name})
}

View File

@@ -1,50 +0,0 @@
package pwd
import (
"encoding/gob"
"os"
"sync"
"github.com/play-with-docker/play-with-docker/config"
)
type StorageApi interface {
Save() error
Load() error
}
type storage struct {
rw sync.Mutex
}
func (store *storage) Load() error {
file, err := os.Open(config.SessionsFile)
if err == nil {
decoder := gob.NewDecoder(file)
err = decoder.Decode(&sessions)
if err != nil {
return err
}
}
file.Close()
return nil
}
func (store *storage) Save() error {
store.rw.Lock()
defer store.rw.Unlock()
file, err := os.Create(config.SessionsFile)
if err == nil {
encoder := gob.NewEncoder(file)
err = encoder.Encode(&sessions)
}
file.Close()
return nil
}
func NewStorage() *storage {
return &storage{}
}

View File

@@ -1,19 +1,70 @@
package pwd package pwd
import "github.com/play-with-docker/play-with-docker/pwd/types"
type mockStorage struct { type mockStorage struct {
save func() error sessionGet func(sessionId string) (*types.Session, error)
load func() error sessionPut func(s *types.Session) error
sessionCount func() (int, error)
sessionDelete func(sessionId string) error
instanceFindByAlias func(sessionPrefix, alias string) (*types.Instance, error)
instanceFindByIP func(ip string) (*types.Instance, error)
instanceFindByIPAndSession func(sessionPrefix, ip string) (*types.Instance, error)
instanceCount func() (int, error)
clientCount func() (int, error)
} }
func (m *mockStorage) Save() error { func (m *mockStorage) SessionGet(sessionId string) (*types.Session, error) {
if m.save != nil { if m.sessionGet != nil {
return m.save() return m.sessionGet(sessionId)
}
return nil, nil
}
func (m *mockStorage) SessionPut(s *types.Session) error {
if m.sessionPut != nil {
return m.sessionPut(s)
} }
return nil return nil
} }
func (m *mockStorage) Load() error { func (m *mockStorage) SessionCount() (int, error) {
if m.load != nil { if m.sessionCount != nil {
return m.load() return m.sessionCount()
}
return 0, nil
}
func (m *mockStorage) SessionDelete(sessionId string) error {
if m.sessionDelete != nil {
return m.sessionDelete(sessionId)
} }
return nil return nil
} }
func (m *mockStorage) InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error) {
if m.instanceFindByAlias != nil {
return m.instanceFindByAlias(sessionPrefix, alias)
}
return nil, nil
}
func (m *mockStorage) InstanceFindByIP(ip string) (*types.Instance, error) {
if m.instanceFindByIP != nil {
return m.instanceFindByIP(ip)
}
return nil, nil
}
func (m *mockStorage) InstanceFindByIPAndSession(sessionPrefix, ip string) (*types.Instance, error) {
if m.instanceFindByIPAndSession != nil {
return m.instanceFindByIPAndSession(sessionPrefix, ip)
}
return nil, nil
}
func (m *mockStorage) InstanceCount() (int, error) {
if m.instanceCount != nil {
return m.instanceCount()
}
return 0, nil
}
func (m *mockStorage) ClientCount() (int, error) {
if m.clientCount != nil {
return m.clientCount()
}
return 0, nil
}

View File

@@ -15,15 +15,16 @@ import (
"github.com/docker/docker/client" "github.com/docker/docker/client"
"github.com/docker/go-connections/tlsconfig" "github.com/docker/go-connections/tlsconfig"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/pwd/types"
) )
type periodicTask interface { type periodicTask interface {
Run(i *Instance) error Run(i *types.Instance) error
} }
type SchedulerApi interface { type SchedulerApi interface {
Schedule(session *Session) Schedule(session *types.Session)
Unschedule(session *Session) Unschedule(session *types.Session)
} }
type scheduler struct { type scheduler struct {
@@ -31,21 +32,20 @@ type scheduler struct {
periodicTasks []periodicTask periodicTasks []periodicTask
} }
func (sch *scheduler) Schedule(s *Session) { func (sch *scheduler) Schedule(s *types.Session) {
if s.scheduled { if s.IsPrepared() {
return return
} }
go func() { go func() {
s.scheduled = true t := time.NewTicker(1 * time.Second)
s.SetTicker(t)
s.ticker = time.NewTicker(1 * time.Second) for range t.C {
for range s.ticker.C {
var wg = sync.WaitGroup{} var wg = sync.WaitGroup{}
wg.Add(len(s.Instances)) wg.Add(len(s.Instances))
for _, ins := range s.Instances { for _, ins := range s.Instances {
var i *Instance = ins var i *types.Instance = ins
if i.docker == nil && i.IsDockerHost { if i.Docker == nil && i.IsDockerHost {
// Need to create client to the DinD docker daemon // Need to create client to the DinD docker daemon
// We check if the client needs to use TLS // We check if the client needs to use TLS
@@ -76,7 +76,7 @@ func (sch *scheduler) Schedule(s *Session) {
if err != nil { if err != nil {
log.Println("Could not connect to DinD docker daemon", err) log.Println("Could not connect to DinD docker daemon", err)
} else { } else {
i.docker = docker.NewDocker(c) i.Docker = docker.NewDocker(c)
} }
} }
go func() { go func() {
@@ -98,17 +98,17 @@ func (sch *scheduler) Schedule(s *Session) {
wg.Wait() wg.Wait()
// broadcast all information // broadcast all information
for _, ins := range s.Instances { for _, ins := range s.Instances {
ins.Ports = UInt16Slice(ins.tempPorts) ins.Ports = types.UInt16Slice(ins.GetUsedPorts())
sort.Sort(ins.Ports) sort.Sort(ins.Ports)
ins.tempPorts = []uint16{} ins.CleanUsedPorts()
sch.broadcast.BroadcastTo(ins.session.Id, "instance stats", ins.Name, ins.Mem, ins.Cpu, ins.IsManager, ins.Ports) sch.broadcast.BroadcastTo(ins.Session.Id, "instance stats", ins.Name, ins.Mem, ins.Cpu, ins.IsManager, ins.Ports)
} }
} }
}() }()
} }
func (sch *scheduler) Unschedule(s *Session) { func (sch *scheduler) Unschedule(s *types.Session) {
} }
func NewScheduler(b BroadcastApi, d docker.DockerApi) *scheduler { func NewScheduler(b BroadcastApi, d docker.DockerApi) *scheduler {

View File

@@ -1,16 +1,18 @@
package pwd package pwd
import "github.com/play-with-docker/play-with-docker/pwd/types"
type mockTasks struct { type mockTasks struct {
schedule func(s *Session) schedule func(s *types.Session)
unschedule func(s *Session) unschedule func(s *types.Session)
} }
func (m *mockTasks) Schedule(s *Session) { func (m *mockTasks) Schedule(s *types.Session) {
if m.schedule != nil { if m.schedule != nil {
m.schedule(s) m.schedule(s)
} }
} }
func (m *mockTasks) Unschedule(s *Session) { func (m *mockTasks) Unschedule(s *types.Session) {
if m.unschedule != nil { if m.unschedule != nil {
m.unschedule(s) m.unschedule(s)
} }

12
pwd/types/client.go Normal file
View File

@@ -0,0 +1,12 @@
package types
type Client struct {
Id string
ViewPort ViewPort
Session *Session
}
type ViewPort struct {
Rows uint
Cols uint
}

63
pwd/types/instance.go Normal file
View File

@@ -0,0 +1,63 @@
package types
import (
"context"
"net"
"sync"
"github.com/play-with-docker/play-with-docker/docker"
)
type UInt16Slice []uint16
func (p UInt16Slice) Len() int { return len(p) }
func (p UInt16Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p UInt16Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type Instance struct {
Image string `json:"image"`
Name string `json:"name"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
IsManager *bool `json:"is_manager"`
Mem string `json:"mem"`
Cpu string `json:"cpu"`
Alias string `json:"alias"`
ServerCert []byte `json:"server_cert"`
ServerKey []byte `json:"server_key"`
CACert []byte `json:"ca_cert"`
Cert []byte `json:"cert"`
Key []byte `json:"key"`
IsDockerHost bool `json:"is_docker_host"`
Docker docker.DockerApi `json:"-"`
Session *Session `json:"-"`
Terminal net.Conn `json:"-"`
ctx context.Context `json:"-"`
tempPorts []uint16 `json:"-"`
Ports UInt16Slice
rw sync.Mutex
}
func (i *Instance) SetUsedPort(port uint16) {
i.rw.Lock()
defer i.rw.Unlock()
for _, p := range i.tempPorts {
if p == port {
return
}
}
i.tempPorts = append(i.tempPorts, port)
}
func (i *Instance) GetUsedPorts() []uint16 {
i.rw.Lock()
defer i.rw.Unlock()
return i.tempPorts
}
func (i *Instance) CleanUsedPorts() {
i.rw.Lock()
defer i.rw.Unlock()
i.tempPorts = []uint16{}
}

56
pwd/types/session.go Normal file
View File

@@ -0,0 +1,56 @@
package types
import (
"sync"
"time"
)
type Session struct {
Id string `json:"id"`
Instances map[string]*Instance `json:"instances"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
PwdIpAddress string `json:"pwd_ip_address"`
Ready bool `json:"ready"`
Stack string `json:"stack"`
StackName string `json:"stack_name"`
ImageName string `json:"image_name"`
Host string `json:"host"`
Clients []*Client `json:"-"`
closingTimer *time.Timer `json:"-"`
scheduled bool `json:"-"`
ticker *time.Ticker `json:"-"`
rw sync.Mutex `json:"-"`
prepared bool `json:"-"`
}
func (s *Session) Lock() {
s.rw.Lock()
}
func (s *Session) Unlock() {
s.rw.Unlock()
}
func (s *Session) StopTicker() {
if s.ticker != nil {
s.ticker.Stop()
}
}
func (s *Session) SetTicker(t *time.Ticker) {
s.ticker = t
}
func (s *Session) SetClosingTimer(t *time.Timer) {
s.closingTimer = t
}
func (s *Session) ClosingTimer() *time.Timer {
return s.closingTimer
}
func (s *Session) IsPrepared() bool {
return s.prepared
}
func (s *Session) SetPrepared() {
s.prepared = true
}

167
storage/file.go Normal file
View File

@@ -0,0 +1,167 @@
package storage
import (
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"github.com/play-with-docker/play-with-docker/pwd/types"
)
type storage struct {
rw sync.Mutex
path string
db map[string]*types.Session
}
func (store *storage) SessionGet(sessionId string) (*types.Session, error) {
store.rw.Lock()
defer store.rw.Unlock()
s, found := store.db[sessionId]
if !found {
return nil, fmt.Errorf("%s", notFound)
}
return s, nil
}
func (store *storage) SessionPut(s *types.Session) error {
store.rw.Lock()
defer store.rw.Unlock()
store.db[s.Id] = s
return store.save()
}
func (store *storage) InstanceFindByIP(ip string) (*types.Instance, error) {
store.rw.Lock()
defer store.rw.Unlock()
for _, s := range store.db {
for _, i := range s.Instances {
if i.IP == ip {
return i, nil
}
}
}
return nil, fmt.Errorf("%s", notFound)
}
func (store *storage) InstanceFindByIPAndSession(sessionPrefix, ip string) (*types.Instance, error) {
store.rw.Lock()
defer store.rw.Unlock()
for id, s := range store.db {
if strings.HasPrefix(id, sessionPrefix) {
for _, i := range s.Instances {
if i.IP == ip {
return i, nil
}
}
}
}
return nil, fmt.Errorf("%s", notFound)
}
func (store *storage) InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error) {
store.rw.Lock()
defer store.rw.Unlock()
for id, s := range store.db {
if strings.HasPrefix(id, sessionPrefix) {
for _, i := range s.Instances {
if i.Alias == alias {
return i, nil
}
}
}
}
return nil, fmt.Errorf("%s", notFound)
}
func (store *storage) SessionCount() (int, error) {
store.rw.Lock()
defer store.rw.Unlock()
return len(store.db), nil
}
func (store *storage) InstanceCount() (int, error) {
store.rw.Lock()
defer store.rw.Unlock()
var ins int
for _, s := range store.db {
ins += len(s.Instances)
}
return ins, nil
}
func (store *storage) ClientCount() (int, error) {
store.rw.Lock()
defer store.rw.Unlock()
var cli int
for _, s := range store.db {
cli += len(s.Clients)
}
return cli, nil
}
func (store *storage) SessionDelete(sessionId string) error {
store.rw.Lock()
defer store.rw.Unlock()
delete(store.db, sessionId)
return store.save()
}
func (store *storage) load() error {
file, err := os.Open(store.path)
if err == nil {
decoder := json.NewDecoder(file)
err = decoder.Decode(&store.db)
if err != nil {
return err
}
} else {
store.db = map[string]*types.Session{}
}
file.Close()
return nil
}
func (store *storage) save() error {
file, err := os.Create(store.path)
if err != nil {
return err
}
defer file.Close()
encoder := json.NewEncoder(file)
err = encoder.Encode(&store.db)
return err
}
func NewFileStorage(path string) (StorageApi, error) {
s := &storage{path: path}
err := s.load()
if err != nil {
return nil, err
}
return s, nil
}

261
storage/file_test.go Normal file
View File

@@ -0,0 +1,261 @@
package storage
import (
"encoding/json"
"io/ioutil"
"log"
"os"
"testing"
"github.com/play-with-docker/play-with-docker/pwd/types"
"github.com/stretchr/testify/assert"
)
func TestSessionPut(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "pwd")
if err != nil {
log.Fatal(err)
}
tmpfile.Close()
os.Remove(tmpfile.Name())
defer os.Remove(tmpfile.Name())
storage, err := NewFileStorage(tmpfile.Name())
assert.Nil(t, err)
s := &types.Session{Id: "a session"}
err = storage.SessionPut(s)
assert.Nil(t, err)
var loadedSessions map[string]*types.Session
expectedSessions := map[string]*types.Session{}
expectedSessions[s.Id] = s
file, err := os.Open(tmpfile.Name())
assert.Nil(t, err)
defer file.Close()
decoder := json.NewDecoder(file)
err = decoder.Decode(&loadedSessions)
assert.Nil(t, err)
assert.EqualValues(t, expectedSessions, loadedSessions)
}
func TestSessionGet(t *testing.T) {
expectedSession := &types.Session{Id: "session1"}
sessions := map[string]*types.Session{}
sessions[expectedSession.Id] = expectedSession
tmpfile, err := ioutil.TempFile("", "pwd")
if err != nil {
log.Fatal(err)
}
encoder := json.NewEncoder(tmpfile)
err = encoder.Encode(&sessions)
assert.Nil(t, err)
tmpfile.Close()
defer os.Remove(tmpfile.Name())
storage, err := NewFileStorage(tmpfile.Name())
assert.Nil(t, err)
_, err = storage.SessionGet("bad id")
assert.True(t, NotFound(err))
loadedSession, err := storage.SessionGet("session1")
assert.Nil(t, err)
assert.Equal(t, expectedSession, loadedSession)
}
func TestInstanceFindByIP(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "pwd")
if err != nil {
log.Fatal(err)
}
tmpfile.Close()
os.Remove(tmpfile.Name())
defer os.Remove(tmpfile.Name())
storage, err := NewFileStorage(tmpfile.Name())
assert.Nil(t, err)
i1 := &types.Instance{Name: "i1", IP: "10.0.0.1"}
i2 := &types.Instance{Name: "i2", IP: "10.1.0.1"}
s1 := &types.Session{Id: "session1", Instances: map[string]*types.Instance{"i1": i1}}
s2 := &types.Session{Id: "session2", Instances: map[string]*types.Instance{"i2": i2}}
err = storage.SessionPut(s1)
assert.Nil(t, err)
err = storage.SessionPut(s2)
assert.Nil(t, err)
foundInstance, err := storage.InstanceFindByIP("10.0.0.1")
assert.Nil(t, err)
assert.Equal(t, i1, foundInstance)
foundInstance, err = storage.InstanceFindByIP("10.1.0.1")
assert.Nil(t, err)
assert.Equal(t, i2, foundInstance)
foundInstance, err = storage.InstanceFindByIP("192.168.0.1")
assert.True(t, NotFound(err))
assert.Nil(t, foundInstance)
}
func TestInstanceFindByIPAndSession(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "pwd")
if err != nil {
log.Fatal(err)
}
tmpfile.Close()
os.Remove(tmpfile.Name())
defer os.Remove(tmpfile.Name())
storage, err := NewFileStorage(tmpfile.Name())
assert.Nil(t, err)
i1 := &types.Instance{Name: "i1", IP: "10.0.0.1"}
i2 := &types.Instance{Name: "i2", IP: "10.1.0.1"}
s1 := &types.Session{Id: "session1", Instances: map[string]*types.Instance{"i1": i1}}
s2 := &types.Session{Id: "session2", Instances: map[string]*types.Instance{"i2": i2}}
err = storage.SessionPut(s1)
assert.Nil(t, err)
err = storage.SessionPut(s2)
assert.Nil(t, err)
foundInstance, err := storage.InstanceFindByIPAndSession("session1", "10.0.0.1")
assert.Nil(t, err)
assert.Equal(t, i1, foundInstance)
foundInstance, err = storage.InstanceFindByIPAndSession("session2", "10.1.0.1")
assert.Nil(t, err)
assert.Equal(t, i2, foundInstance)
foundInstance, err = storage.InstanceFindByIPAndSession("session3", "10.1.0.1")
assert.True(t, NotFound(err))
assert.Nil(t, foundInstance)
foundInstance, err = storage.InstanceFindByIPAndSession("session1", "10.1.0.1")
assert.True(t, NotFound(err))
assert.Nil(t, foundInstance)
foundInstance, err = storage.InstanceFindByIPAndSession("session1", "192.168.0.1")
assert.True(t, NotFound(err))
assert.Nil(t, foundInstance)
}
func TestInstanceFindByAlias(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "pwd")
if err != nil {
log.Fatal(err)
}
tmpfile.Close()
os.Remove(tmpfile.Name())
defer os.Remove(tmpfile.Name())
storage, err := NewFileStorage(tmpfile.Name())
assert.Nil(t, err)
i1 := &types.Instance{Name: "i1", Alias: "foo", IP: "10.0.0.1"}
i2 := &types.Instance{Name: "i2", Alias: "foo", IP: "10.1.0.1"}
s1 := &types.Session{Id: "session1", Instances: map[string]*types.Instance{"i1": i1}}
s2 := &types.Session{Id: "session2", Instances: map[string]*types.Instance{"i2": i2}}
err = storage.SessionPut(s1)
assert.Nil(t, err)
err = storage.SessionPut(s2)
assert.Nil(t, err)
foundInstance, err := storage.InstanceFindByAlias("session1", "foo")
assert.Nil(t, err)
assert.Equal(t, i1, foundInstance)
foundInstance, err = storage.InstanceFindByAlias("session2", "foo")
assert.Nil(t, err)
assert.Equal(t, i2, foundInstance)
foundInstance, err = storage.InstanceFindByAlias("session1", "bar")
assert.True(t, NotFound(err))
assert.Nil(t, foundInstance)
foundInstance, err = storage.InstanceFindByAlias("session3", "foo")
assert.True(t, NotFound(err))
assert.Nil(t, foundInstance)
}
func TestCounts(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "pwd")
if err != nil {
log.Fatal(err)
}
tmpfile.Close()
os.Remove(tmpfile.Name())
defer os.Remove(tmpfile.Name())
storage, err := NewFileStorage(tmpfile.Name())
assert.Nil(t, err)
c1 := &types.Client{}
i1 := &types.Instance{Name: "i1", Alias: "foo", IP: "10.0.0.1"}
i2 := &types.Instance{Name: "i2", Alias: "foo", IP: "10.1.0.1"}
s1 := &types.Session{Id: "session1", Instances: map[string]*types.Instance{"i1": i1}}
s2 := &types.Session{Id: "session2", Instances: map[string]*types.Instance{"i2": i2}}
s3 := &types.Session{Id: "session3", Clients: []*types.Client{c1}}
err = storage.SessionPut(s1)
assert.Nil(t, err)
err = storage.SessionPut(s2)
assert.Nil(t, err)
err = storage.SessionPut(s3)
assert.Nil(t, err)
num, err := storage.SessionCount()
assert.Nil(t, err)
assert.Equal(t, 3, num)
num, err = storage.InstanceCount()
assert.Nil(t, err)
assert.Equal(t, 2, num)
num, err = storage.ClientCount()
assert.Nil(t, err)
assert.Equal(t, 1, num)
}
func TestSessionDelete(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "pwd")
if err != nil {
log.Fatal(err)
}
tmpfile.Close()
os.Remove(tmpfile.Name())
defer os.Remove(tmpfile.Name())
storage, err := NewFileStorage(tmpfile.Name())
assert.Nil(t, err)
s1 := &types.Session{Id: "session1"}
err = storage.SessionPut(s1)
assert.Nil(t, err)
found, err := storage.SessionGet(s1.Id)
assert.Nil(t, err)
assert.Equal(t, s1, found)
err = storage.SessionDelete(s1.Id)
assert.Nil(t, err)
found, err = storage.SessionGet(s1.Id)
assert.True(t, NotFound(err))
assert.Nil(t, found)
}

24
storage/storage.go Normal file
View File

@@ -0,0 +1,24 @@
package storage
import "github.com/play-with-docker/play-with-docker/pwd/types"
const notFound = "NotFound"
func NotFound(e error) bool {
return e.Error() == notFound
}
type StorageApi interface {
SessionGet(sessionId string) (*types.Session, error)
SessionPut(*types.Session) error
SessionCount() (int, error)
SessionDelete(sessionId string) error
InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error)
// Should have the session id too, soon
InstanceFindByIP(ip string) (*types.Instance, error)
InstanceFindByIPAndSession(sessionPrefix, ip string) (*types.Instance, error)
InstanceCount() (int, error)
ClientCount() (int, error)
}