Add storage API abstraction

This commit is contained in:
Jonathan Leibiusky @xetorthio
2017-06-22 09:16:49 -03:00
committed by Marcos Lilljedahl
19 changed files with 277 additions and 83 deletions

View File

@@ -41,7 +41,11 @@ ENV DOCKER_STORAGE_DRIVER=$docker_storage_driver
# Move to our home # Move to our home
WORKDIR /root WORKDIR /root
RUN mkdir -p /var/run/pwd/certs && mkdir -p /var/run/pwd/uploads # Setup certs and ssh keys
RUN mkdir -p /var/run/pwd/certs && mkdir -p /var/run/pwd/uploads \
&& ssh-keygen -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key >/dev/null \
&& mkdir ~/.ssh && ssh-keygen -N "" -t rsa -f ~/.ssh/id_rsa \
&& cat ~/.ssh/id_rsa.pub > ~/.ssh/authorized_keys
# Remove IPv6 alias for localhost and start docker in the background ... # Remove IPv6 alias for localhost and start docker in the background ...
CMD cat /etc/hosts >/etc/hosts.bak && \ CMD cat /etc/hosts >/etc/hosts.bak && \
@@ -53,7 +57,7 @@ CMD cat /etc/hosts >/etc/hosts.bak && \
sed -i "s/\DOCKER_TLSCERT/$DOCKER_TLSCERT/" /etc/docker/daemon.json && \ sed -i "s/\DOCKER_TLSCERT/$DOCKER_TLSCERT/" /etc/docker/daemon.json && \
sed -i "s/\DOCKER_TLSKEY/$DOCKER_TLSKEY/" /etc/docker/daemon.json && \ sed -i "s/\DOCKER_TLSKEY/$DOCKER_TLSKEY/" /etc/docker/daemon.json && \
umount /var/lib/docker && mount -t securityfs none /sys/kernel/security && \ umount /var/lib/docker && mount -t securityfs none /sys/kernel/security && \
echo "root:root" | chpasswd &> /dev/null && ssh-keygen -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key >/dev/null && \ echo "root:root" | chpasswd &> /dev/null && \
/usr/sbin/sshd -o PermitRootLogin=yes -o PrintMotd=no 2>/dev/null && \ /usr/sbin/sshd -o PermitRootLogin=yes -o PrintMotd=no 2>/dev/null && \
dockerd &>/docker.log & \ dockerd &>/docker.log & \
while true ; do script -q -c "/bin/bash -l" /dev/null ; done while true ; do script -q -c "/bin/bash -l" /dev/null ; done

View File

@@ -20,7 +20,7 @@ const (
var NameFilter = regexp.MustCompile(PWDHostPortGroupRegex) var NameFilter = regexp.MustCompile(PWDHostPortGroupRegex)
var AliasFilter = regexp.MustCompile(AliasPortGroupRegex) var AliasFilter = regexp.MustCompile(AliasPortGroupRegex)
var SSLPortNumber, PortNumber, Key, Cert, SessionsFile, PWDContainerName, PWDCName, HashKey string var SSLPortNumber, PortNumber, Key, Cert, SessionsFile, PWDContainerName, PWDCName, HashKey
var MaxLoadAvg float64 var MaxLoadAvg float64
func ParseFlags() { func ParseFlags() {

View File

@@ -32,7 +32,7 @@ func Bootstrap() {
s, err := storage.NewFileStorage(config.SessionsFile) s, err := storage.NewFileStorage(config.SessionsFile)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
log.Fatal("Error decoding sessions from disk ", err) log.Fatal("Error initializing StorageAPI: ", err)
} }
core = pwd.NewPWD(d, t, Broadcast, s) core = pwd.NewPWD(d, t, Broadcast, s)

View File

@@ -1,6 +1,7 @@
package handlers package handlers
import ( import (
"io"
"log" "log"
"net/http" "net/http"
@@ -28,18 +29,34 @@ func FileUpload(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK) rw.WriteHeader(http.StatusOK)
return return
} else { } else {
// This is for multipart upload red, err := req.MultipartReader()
log.Println("Not implemented yet") if err != nil {
log.Println(err)
/* rw.WriteHeader(http.StatusBadRequest)
err := req.ParseMultipartForm(32 << 20) return
}
for {
p, err := red.NextPart()
if err == io.EOF {
break
}
if err != nil { if err != nil {
log.Println(err) log.Println(err)
rw.WriteHeader(http.StatusBadRequest) continue
}
if p.FileName() == "" {
continue
}
err = core.InstanceUploadFromReader(i, p.FileName(), p)
if err != nil {
log.Println(err)
rw.WriteHeader(http.StatusInternalServerError)
return return
} }
*/ log.Printf("Uploaded [%s] to [%s]\n", p.FileName(), i.Name)
rw.WriteHeader(http.StatusInternalServerError) }
rw.WriteHeader(http.StatusOK)
return return
} }

View File

@@ -34,8 +34,7 @@ func WS(so socketio.Socket) {
so.On("terminal in", func(name, data string) { so.On("terminal in", func(name, data string) {
// User wrote something on the terminal. Need to write it to the instance terminal // User wrote something on the terminal. Need to write it to the instance terminal
instance := core.InstanceGet(session, name) core.InstanceWriteToTerminal(session.Id, name, data)
core.InstanceWriteToTerminal(instance, data)
}) })
so.On("viewport resize", func(cols, rows uint) { so.On("viewport resize", func(cols, rows uint) {

View File

@@ -2,6 +2,7 @@ package pwd
import ( import (
"log" "log"
"sync/atomic"
"time" "time"
"github.com/play-with-docker/play-with-docker/pwd/types" "github.com/play-with-docker/play-with-docker/pwd/types"
@@ -11,6 +12,7 @@ func (p *pwd) ClientNew(id string, session *types.Session) *types.Client {
defer observeAction("ClientNew", time.Now()) defer observeAction("ClientNew", time.Now())
c := &types.Client{Id: id, Session: session} c := &types.Client{Id: id, Session: session}
session.Clients = append(session.Clients, c) session.Clients = append(session.Clients, c)
p.clientCount = atomic.AddInt32(&p.clientCount, 1)
return c return c
} }
@@ -29,6 +31,7 @@ func (p *pwd) ClientClose(client *types.Client) {
for i, cl := range session.Clients { for i, cl := range session.Clients {
if cl.Id == client.Id { if cl.Id == client.Id {
session.Clients = append(session.Clients[:i], session.Clients[i+1:]...) session.Clients = append(session.Clients[:i], session.Clients[i+1:]...)
p.clientCount = atomic.AddInt32(&p.clientCount, -1)
break break
} }
} }
@@ -38,6 +41,10 @@ func (p *pwd) ClientClose(client *types.Client) {
p.setGauges() p.setGauges()
} }
func (p *pwd) ClientCount() int {
return int(atomic.LoadInt32(&p.clientCount))
}
func (p *pwd) notifyClientSmallestViewPort(session *types.Session) { func (p *pwd) notifyClientSmallestViewPort(session *types.Session) {
vp := p.SessionGetSmallestViewPort(session) vp := p.SessionGetSmallestViewPort(session)
// Resize all terminals in the session // Resize all terminals in the session

View File

@@ -24,6 +24,21 @@ func TestClientNew(t *testing.T) {
assert.Equal(t, types.Client{Id: "foobar", Session: session, ViewPort: types.ViewPort{Cols: 0, Rows: 0}}, *client) assert.Equal(t, types.Client{Id: "foobar", Session: session, ViewPort: types.ViewPort{Cols: 0, Rows: 0}}, *client)
assert.Contains(t, session.Clients, client) assert.Contains(t, session.Clients, client)
} }
func TestClientCount(t *testing.T) {
docker := &mockDocker{}
tasks := &mockTasks{}
broadcast := &mockBroadcast{}
storage := &mockStorage{}
p := NewPWD(docker, tasks, broadcast, storage)
session, err := p.SessionNew(time.Hour, "", "", "")
assert.Nil(t, err)
p.ClientNew("foobar", session)
assert.Equal(t, 1, p.ClientCount())
}
func TestClientResizeViewPort(t *testing.T) { func TestClientResizeViewPort(t *testing.T) {
docker := &mockDocker{} docker := &mockDocker{}

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"net"
"net/http" "net/http"
"path/filepath" "path/filepath"
"strings" "strings"
@@ -22,6 +23,8 @@ type sessionWriter struct {
broadcast BroadcastApi broadcast BroadcastApi
} }
var terms = make(map[string]map[string]net.Conn)
func (s *sessionWriter) Write(p []byte) (n int, err error) { func (s *sessionWriter) Write(p []byte) (n int, err error) {
s.broadcast.BroadcastTo(s.sessionId, "terminal out", s.instanceName, string(p)) s.broadcast.BroadcastTo(s.sessionId, "terminal out", s.instanceName, string(p))
return len(p), nil return len(p), nil
@@ -45,6 +48,10 @@ func (p *pwd) InstanceResizeTerminal(instance *types.Instance, rows, cols uint)
} }
func (p *pwd) InstanceAttachTerminal(instance *types.Instance) error { func (p *pwd) InstanceAttachTerminal(instance *types.Instance) error {
// already have a connection for this instance
if getInstanceTermConn(instance.SessionId, instance.Name) != nil {
return nil
}
conn, err := p.docker.CreateAttachConnection(instance.Name) conn, err := p.docker.CreateAttachConnection(instance.Name)
if err != nil { if err != nil {
@@ -53,9 +60,12 @@ func (p *pwd) InstanceAttachTerminal(instance *types.Instance) error {
encoder := encoding.Replacement.NewEncoder() encoder := encoding.Replacement.NewEncoder()
sw := &sessionWriter{sessionId: instance.Session.Id, instanceName: instance.Name, broadcast: p.broadcast} sw := &sessionWriter{sessionId: instance.Session.Id, instanceName: instance.Name, broadcast: p.broadcast}
instance.Terminal = conn if terms[instance.SessionId] == nil {
terms[instance.SessionId] = map[string]net.Conn{instance.Name: conn}
} else {
terms[instance.SessionId][instance.Name] = conn
}
io.Copy(encoder.Writer(sw), conn) io.Copy(encoder.Writer(sw), conn)
return nil return nil
} }
@@ -82,6 +92,18 @@ func (p *pwd) InstanceUploadFromUrl(instance *types.Instance, url string) error
return nil return nil
} }
func (p *pwd) InstanceUploadFromReader(instance *types.Instance, fileName string, reader io.Reader) error {
defer observeAction("InstanceUploadFromReader", time.Now())
copyErr := p.docker.CopyToContainer(instance.Name, "/var/run/pwd/uploads", fileName, reader)
if copyErr != nil {
return fmt.Errorf("Error while uploading file [%s]. Error: %s\n", fileName, copyErr)
}
return nil
}
func (p *pwd) InstanceGet(session *types.Session, name string) *types.Instance { func (p *pwd) InstanceGet(session *types.Session, name string) *types.Instance {
defer observeAction("InstanceGet", time.Now()) defer observeAction("InstanceGet", time.Now())
return session.Instances[name] return session.Instances[name]
@@ -118,8 +140,10 @@ func (p *pwd) InstanceFindByAlias(sessionPrefix, alias string) *types.Instance {
func (p *pwd) InstanceDelete(session *types.Session, instance *types.Instance) error { func (p *pwd) InstanceDelete(session *types.Session, instance *types.Instance) error {
defer observeAction("InstanceDelete", time.Now()) defer observeAction("InstanceDelete", time.Now())
if instance.Terminal != nil { conn := getInstanceTermConn(session.Id, instance.Name)
instance.Terminal.Close() if conn != nil {
conn.Close()
delete(terms[instance.SessionId], instance.Name)
} }
err := p.docker.DeleteContainer(instance.Name) err := p.docker.DeleteContainer(instance.Name)
if err != nil && !strings.Contains(err.Error(), "No such container") { if err != nil && !strings.Contains(err.Error(), "No such container") {
@@ -130,7 +154,7 @@ func (p *pwd) InstanceDelete(session *types.Session, instance *types.Instance) e
p.broadcast.BroadcastTo(session.Id, "delete instance", instance.Name) p.broadcast.BroadcastTo(session.Id, "delete instance", instance.Name)
delete(session.Instances, instance.Name) delete(session.Instances, instance.Name)
if err := p.storage.SessionPut(session); err != nil { if err := p.storage.InstanceDelete(session.Id, instance.Name); err != nil {
return err return err
} }
@@ -202,6 +226,7 @@ func (p *pwd) InstanceNew(session *types.Session, conf InstanceConfig) (*types.I
instance := &types.Instance{} instance := &types.Instance{}
instance.Image = opts.Image instance.Image = opts.Image
instance.IP = ip instance.IP = ip
instance.SessionId = session.Id
instance.Name = containerName instance.Name = containerName
instance.Hostname = conf.Hostname instance.Hostname = conf.Hostname
instance.Alias = conf.Alias instance.Alias = conf.Alias
@@ -221,7 +246,7 @@ func (p *pwd) InstanceNew(session *types.Session, conf InstanceConfig) (*types.I
go p.InstanceAttachTerminal(instance) go p.InstanceAttachTerminal(instance)
err = p.storage.SessionPut(session) err = p.storage.InstanceCreate(session.Id, instance)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -233,10 +258,11 @@ func (p *pwd) InstanceNew(session *types.Session, conf InstanceConfig) (*types.I
return instance, nil return instance, nil
} }
func (p *pwd) InstanceWriteToTerminal(instance *types.Instance, data string) { func (p *pwd) InstanceWriteToTerminal(sessionId, instanceName string, data string) {
defer observeAction("InstanceWriteToTerminal", time.Now()) defer observeAction("InstanceWriteToTerminal", time.Now())
if instance != nil && instance.Terminal != nil && len(data) > 0 { conn := getInstanceTermConn(sessionId, instanceName)
instance.Terminal.Write([]byte(data)) if conn != nil && len(data) > 0 {
conn.Write([]byte(data))
} }
} }
@@ -255,3 +281,7 @@ func (p *pwd) InstanceExec(instance *types.Instance, cmd []string) (int, error)
defer observeAction("InstanceExec", time.Now()) defer observeAction("InstanceExec", time.Now())
return p.docker.Exec(instance.Name, cmd) return p.docker.Exec(instance.Name, cmd)
} }
func getInstanceTermConn(sessionId, instanceName string) net.Conn {
return terms[sessionId][instanceName]
}

View File

@@ -1,7 +1,9 @@
package pwd package pwd
import ( import (
"errors"
"fmt" "fmt"
"net"
"sync" "sync"
"testing" "testing"
"time" "time"
@@ -69,6 +71,7 @@ func TestInstanceNew(t *testing.T) {
Alias: "", Alias: "",
Image: config.GetDindImageName(), Image: config.GetDindImageName(),
IsDockerHost: true, IsDockerHost: true,
SessionId: session.Id,
Session: session, Session: session,
} }
@@ -159,6 +162,7 @@ func TestInstanceNew_WithNotAllowedImage(t *testing.T) {
IP: "10.0.0.1", IP: "10.0.0.1",
Alias: "", Alias: "",
Image: "redis", Image: "redis",
SessionId: session.Id,
IsDockerHost: false, IsDockerHost: false,
Session: session, Session: session,
} }
@@ -209,6 +213,7 @@ func TestInstanceNew_WithCustomHostname(t *testing.T) {
Image: "redis", Image: "redis",
IsDockerHost: false, IsDockerHost: false,
Session: session, Session: session,
SessionId: session.Id,
} }
assert.Equal(t, expectedInstance, *instance) assert.Equal(t, expectedInstance, *instance)
@@ -239,3 +244,39 @@ func TestInstanceAllowedImages(t *testing.T) {
assert.Equal(t, expectedImages, p.InstanceAllowedImages()) assert.Equal(t, expectedImages, p.InstanceAllowedImages())
} }
type errConn struct {
*mockConn
}
func (ec errConn) Read(b []byte) (int, error) {
return 0, errors.New("Error")
}
func TestTermConnAssignment(t *testing.T) {
dock := &mockDocker{}
tasks := &mockTasks{}
broadcast := &mockBroadcast{}
storage := &mockStorage{}
dock.createAttachConnection = func(name string) (net.Conn, error) {
// return error connection to unlock the goroutine
return errConn{}, nil
}
p := NewPWD(dock, tasks, broadcast, storage)
session, _ := p.SessionNew(time.Hour, "", "", "")
mockInstance := &types.Instance{
Name: fmt.Sprintf("%s_redis-master", session.Id[:8]),
Hostname: "redis-master",
IP: "10.0.0.1",
Alias: "",
SessionId: session.Id,
Image: "redis",
IsDockerHost: false,
Session: session,
}
p.InstanceAttachTerminal(mockInstance)
assert.NotNil(t, getInstanceTermConn(session.Id, mockInstance.Name))
}

View File

@@ -1,6 +1,7 @@
package pwd package pwd
import ( import (
"io"
"time" "time"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
@@ -42,10 +43,11 @@ func init() {
} }
type pwd struct { type pwd struct {
docker docker.DockerApi docker docker.DockerApi
tasks SchedulerApi tasks SchedulerApi
broadcast BroadcastApi broadcast BroadcastApi
storage storage.StorageApi storage storage.StorageApi
clientCount int32
} }
type PWDApi interface { type PWDApi interface {
@@ -60,18 +62,21 @@ type PWDApi interface {
InstanceResizeTerminal(instance *types.Instance, cols, rows uint) error InstanceResizeTerminal(instance *types.Instance, cols, rows uint) error
InstanceAttachTerminal(instance *types.Instance) error InstanceAttachTerminal(instance *types.Instance) error
InstanceUploadFromUrl(instance *types.Instance, url string) error InstanceUploadFromUrl(instance *types.Instance, url string) error
InstanceUploadFromReader(instance *types.Instance, filename string, reader io.Reader) error
InstanceGet(session *types.Session, name string) *types.Instance InstanceGet(session *types.Session, name string) *types.Instance
// TODO remove this function when we add the session prefix to the PWD url
InstanceFindByIP(ip string) *types.Instance InstanceFindByIP(ip string) *types.Instance
InstanceFindByAlias(sessionPrefix, alias string) *types.Instance InstanceFindByAlias(sessionPrefix, alias string) *types.Instance
InstanceFindByIPAndSession(sessionPrefix, ip string) *types.Instance InstanceFindByIPAndSession(sessionPrefix, ip string) *types.Instance
InstanceDelete(session *types.Session, instance *types.Instance) error InstanceDelete(session *types.Session, instance *types.Instance) error
InstanceWriteToTerminal(instance *types.Instance, data string) InstanceWriteToTerminal(sessionId, instanceName string, data string)
InstanceAllowedImages() []string InstanceAllowedImages() []string
InstanceExec(instance *types.Instance, cmd []string) (int, error) InstanceExec(instance *types.Instance, cmd []string) (int, error)
ClientNew(id string, session *types.Session) *types.Client ClientNew(id string, session *types.Session) *types.Client
ClientResizeViewPort(client *types.Client, cols, rows uint) ClientResizeViewPort(client *types.Client, cols, rows uint)
ClientClose(client *types.Client) ClientClose(client *types.Client)
ClientCount() int
} }
func NewPWD(d docker.DockerApi, t SchedulerApi, b BroadcastApi, s storage.StorageApi) *pwd { func NewPWD(d docker.DockerApi, t SchedulerApi, b BroadcastApi, s storage.StorageApi) *pwd {
@@ -83,7 +88,7 @@ func (p *pwd) setGauges() {
ses := float64(s) ses := float64(s)
i, _ := p.storage.InstanceCount() i, _ := p.storage.InstanceCount()
ins := float64(i) ins := float64(i)
c, _ := p.storage.ClientCount() c := p.ClientCount()
cli := float64(c) cli := float64(c)
clientsGauge.Set(cli) clientsGauge.Set(cli)

View File

@@ -15,6 +15,8 @@ import (
"github.com/twinj/uuid" "github.com/twinj/uuid"
) )
var preparedSessions = map[string]bool{}
type sessionBuilderWriter struct { type sessionBuilderWriter struct {
sessionId string sessionId string
broadcast BroadcastApi broadcast BroadcastApi
@@ -64,7 +66,7 @@ func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName str
} }
log.Printf("Network [%s] created for session [%s]\n", s.Id, s.Id) log.Printf("Network [%s] created for session [%s]\n", s.Id, s.Id)
if err := p.prepareSession(s); err != nil { if _, err := p.prepareSession(s); err != nil {
log.Println(err) log.Println(err)
return nil, err return nil, err
} }
@@ -183,7 +185,7 @@ func (p *pwd) SessionGet(sessionId string) *types.Session {
s, _ := p.storage.SessionGet(sessionId) s, _ := p.storage.SessionGet(sessionId)
if err := p.prepareSession(s); err != nil { if _, err := p.prepareSession(s); err != nil {
log.Println(err) log.Println(err)
return nil return nil
} }
@@ -280,22 +282,27 @@ func (p *pwd) SessionSetup(session *types.Session, conf SessionSetupConf) error
return nil return nil
} }
func isSessionPrepared(sessionId string) bool {
_, ok := preparedSessions[sessionId]
return ok
}
// This function should be called any time a session needs to be prepared: // This function should be called any time a session needs to be prepared:
// 1. Like when it is created // 1. Like when it is created
// 2. When it was loaded from storage // 2. When it was loaded from storage
func (p *pwd) prepareSession(session *types.Session) error { func (p *pwd) prepareSession(session *types.Session) (bool, error) {
session.Lock() session.Lock()
defer session.Unlock() defer session.Unlock()
if session.IsPrepared() { if isSessionPrepared(session.Id) {
return nil return false, nil
} }
p.scheduleSessionClose(session) p.scheduleSessionClose(session)
// Connect PWD daemon to the new network // Connect PWD daemon to the new network
if err := p.connectToNetwork(session); err != nil { if err := p.connectToNetwork(session); err != nil {
return err return false, err
} }
// Schedule periodic tasks // Schedule periodic tasks
@@ -306,9 +313,9 @@ func (p *pwd) prepareSession(session *types.Session) error {
i.Session = session i.Session = session
go p.InstanceAttachTerminal(i) go p.InstanceAttachTerminal(i)
} }
session.SetPrepared() preparedSessions[session.Id] = true
return nil return true, nil
} }
func (p *pwd) scheduleSessionClose(s *types.Session) { func (p *pwd) scheduleSessionClose(s *types.Session) {

View File

@@ -209,10 +209,10 @@ func TestSessionSetup(t *testing.T) {
Image: "franela/dind", Image: "franela/dind",
Hostname: "manager1", Hostname: "manager1",
IP: "10.0.0.1", IP: "10.0.0.1",
SessionId: s.Id,
Alias: "", Alias: "",
IsDockerHost: true, IsDockerHost: true,
Session: s, Session: s,
Terminal: manager1Received.Terminal,
Docker: manager1Received.Docker, Docker: manager1Received.Docker,
}, manager1Received) }, manager1Received)
@@ -225,8 +225,8 @@ func TestSessionSetup(t *testing.T) {
IP: "10.0.0.2", IP: "10.0.0.2",
Alias: "", Alias: "",
IsDockerHost: true, IsDockerHost: true,
SessionId: s.Id,
Session: s, Session: s,
Terminal: manager2Received.Terminal,
Docker: manager2Received.Docker, Docker: manager2Received.Docker,
}, manager2Received) }, manager2Received)
@@ -238,9 +238,9 @@ func TestSessionSetup(t *testing.T) {
Hostname: "manager3", Hostname: "manager3",
IP: "10.0.0.3", IP: "10.0.0.3",
Alias: "", Alias: "",
SessionId: s.Id,
IsDockerHost: true, IsDockerHost: true,
Session: s, Session: s,
Terminal: manager3Received.Terminal,
Docker: manager3Received.Docker, Docker: manager3Received.Docker,
}, manager3Received) }, manager3Received)
@@ -252,9 +252,9 @@ func TestSessionSetup(t *testing.T) {
Hostname: "worker1", Hostname: "worker1",
IP: "10.0.0.4", IP: "10.0.0.4",
Alias: "", Alias: "",
SessionId: s.Id,
IsDockerHost: true, IsDockerHost: true,
Session: s, Session: s,
Terminal: worker1Received.Terminal,
Docker: worker1Received.Docker, Docker: worker1Received.Docker,
}, worker1Received) }, worker1Received)
@@ -266,9 +266,9 @@ func TestSessionSetup(t *testing.T) {
Hostname: "other", Hostname: "other",
IP: "10.0.0.5", IP: "10.0.0.5",
Alias: "", Alias: "",
SessionId: s.Id,
IsDockerHost: true, IsDockerHost: true,
Session: s, Session: s,
Terminal: otherReceived.Terminal,
Docker: otherReceived.Docker, Docker: otherReceived.Docker,
}, otherReceived) }, otherReceived)
@@ -277,3 +277,20 @@ func TestSessionSetup(t *testing.T) {
assert.True(t, manager3JoinedHasManager) assert.True(t, manager3JoinedHasManager)
assert.True(t, worker1JoinedHasWorker) assert.True(t, worker1JoinedHasWorker)
} }
func TestSessionPrepareOnce(t *testing.T) {
dock := &mockDocker{}
tasks := &mockTasks{}
broadcast := &mockBroadcast{}
storage := &mockStorage{}
p := NewPWD(dock, tasks, broadcast, storage)
session := &types.Session{Id: "1234"}
prepared, err := p.prepareSession(session)
assert.True(t, preparedSessions[session.Id])
assert.True(t, prepared)
prepared, err = p.prepareSession(session)
assert.Nil(t, err)
assert.False(t, prepared)
}

View File

@@ -10,6 +10,8 @@ type mockStorage struct {
instanceFindByAlias func(sessionPrefix, alias string) (*types.Instance, error) instanceFindByAlias func(sessionPrefix, alias string) (*types.Instance, error)
instanceFindByIP func(ip string) (*types.Instance, error) instanceFindByIP func(ip string) (*types.Instance, error)
instanceFindByIPAndSession func(sessionPrefix, ip string) (*types.Instance, error) instanceFindByIPAndSession func(sessionPrefix, ip string) (*types.Instance, error)
instanceCreate func(string, *types.Instance) error
instanceDelete func(sessionId string, instanceName string) error
instanceCount func() (int, error) instanceCount func() (int, error)
clientCount func() (int, error) clientCount func() (int, error)
} }
@@ -56,6 +58,18 @@ func (m *mockStorage) InstanceFindByIPAndSession(sessionPrefix, ip string) (*typ
} }
return nil, nil return nil, nil
} }
func (m *mockStorage) InstanceCreate(sessionId string, instance *types.Instance) error {
if m.instanceCreate != nil {
return m.instanceCreate(sessionId, instance)
}
return nil
}
func (m *mockStorage) InstanceDelete(sessionId, instanceName string) error {
if m.instanceDelete != nil {
return m.instanceDelete(sessionId, instanceName)
}
return nil
}
func (m *mockStorage) InstanceCount() (int, error) { func (m *mockStorage) InstanceCount() (int, error) {
if m.instanceCount != nil { if m.instanceCount != nil {
return m.instanceCount() return m.instanceCount()

View File

@@ -33,7 +33,7 @@ type scheduler struct {
} }
func (sch *scheduler) Schedule(s *types.Session) { func (sch *scheduler) Schedule(s *types.Session) {
if s.IsPrepared() { if isSessionPrepared(s.Id) {
return return
} }

View File

@@ -2,7 +2,6 @@ package types
import ( import (
"context" "context"
"net"
"sync" "sync"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
@@ -15,27 +14,28 @@ func (p UInt16Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p UInt16Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p UInt16Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type Instance struct { type Instance struct {
Image string `json:"image"` Image string `json:"image" bson:"image"`
Name string `json:"name"` Name string `json:"name" bson:"name"`
Hostname string `json:"hostname"` Hostname string `json:"hostname" bson:"hostname"`
IP string `json:"ip"` IP string `json:"ip" bson:"ip"`
IsManager *bool `json:"is_manager"` IsManager *bool `json:"is_manager" bson:"is_manager"`
Mem string `json:"mem"` Mem string `json:"mem" bson:"mem"`
Cpu string `json:"cpu"` Cpu string `json:"cpu" bson:"cpu"`
Alias string `json:"alias"` Alias string `json:"alias" bson:"alias"`
ServerCert []byte `json:"server_cert"` ServerCert []byte `json:"server_cert" bson:"server_cert"`
ServerKey []byte `json:"server_key"` ServerKey []byte `json:"server_key" bson:"server_key"`
CACert []byte `json:"ca_cert"` CACert []byte `json:"ca_cert" bson:"ca_cert"`
Cert []byte `json:"cert"` Cert []byte `json:"cert" bson:"cert"`
Key []byte `json:"key"` Key []byte `json:"key" bson:"key"`
IsDockerHost bool `json:"is_docker_host"` IsDockerHost bool `json:"is_docker_host" bson:"is_docker_host"`
Docker docker.DockerApi `json:"-"` SessionId string `json:"session_id" bson:"session_id"`
Session *Session `json:"-"` SessionPrefix string `json:"session_prefix" bson:"session_prefix"`
Terminal net.Conn `json:"-"` Docker docker.DockerApi `json:"-"`
ctx context.Context `json:"-"` Session *Session `json:"-" bson:"-"`
tempPorts []uint16 `json:"-"` ctx context.Context `json:"-" bson:"-"`
Ports UInt16Slice tempPorts []uint16 `json:"-" bson:"-"`
rw sync.Mutex Ports UInt16Slice
rw sync.Mutex
} }
func (i *Instance) SetUsedPort(port uint16) { func (i *Instance) SetUsedPort(port uint16) {

View File

@@ -7,7 +7,7 @@ import (
type Session struct { type Session struct {
Id string `json:"id"` Id string `json:"id"`
Instances map[string]*Instance `json:"instances"` Instances map[string]*Instance `json:"instances" bson:"-"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"` ExpiresAt time.Time `json:"expires_at"`
PwdIpAddress string `json:"pwd_ip_address"` PwdIpAddress string `json:"pwd_ip_address"`
@@ -16,12 +16,11 @@ type Session struct {
StackName string `json:"stack_name"` StackName string `json:"stack_name"`
ImageName string `json:"image_name"` ImageName string `json:"image_name"`
Host string `json:"host"` Host string `json:"host"`
Clients []*Client `json:"-"` Clients []*Client `json:"-" bson:"-"`
closingTimer *time.Timer `json:"-"` closingTimer *time.Timer `json:"-"`
scheduled bool `json:"-"` scheduled bool `json:"-"`
ticker *time.Ticker `json:"-"` ticker *time.Ticker `json:"-"`
rw sync.Mutex `json:"-"` rw sync.Mutex `json:"-"`
prepared bool `json:"-"`
} }
func (s *Session) Lock() { func (s *Session) Lock() {
@@ -46,11 +45,3 @@ func (s *Session) SetClosingTimer(t *time.Timer) {
func (s *Session) ClosingTimer() *time.Timer { func (s *Session) ClosingTimer() *time.Timer {
return s.closingTimer return s.closingTimer
} }
func (s *Session) IsPrepared() bool {
return s.prepared
}
func (s *Session) SetPrepared() {
s.prepared = true
}

View File

@@ -31,6 +31,10 @@ func (store *storage) SessionPut(s *types.Session) error {
store.rw.Lock() store.rw.Lock()
defer store.rw.Unlock() defer store.rw.Unlock()
// Initialize instances map if nil
if s.Instances == nil {
s.Instances = map[string]*types.Instance{}
}
store.db[s.Id] = s store.db[s.Id] = s
return store.save() return store.save()
@@ -85,6 +89,24 @@ func (store *storage) InstanceFindByAlias(sessionPrefix, alias string) (*types.I
return nil, fmt.Errorf("%s", notFound) return nil, fmt.Errorf("%s", notFound)
} }
func (store *storage) InstanceCreate(sessionId string, instance *types.Instance) error {
store.rw.Lock()
defer store.rw.Unlock()
s, found := store.db[sessionId]
if !found {
return fmt.Errorf("Session %s", notFound)
}
s.Instances[instance.Name] = instance
return store.save()
}
func (store *storage) InstanceDelete(sessionId, name string) error {
panic("not implemented")
}
func (store *storage) SessionCount() (int, error) { func (store *storage) SessionCount() (int, error) {
store.rw.Lock() store.rw.Lock()
defer store.rw.Unlock() defer store.rw.Unlock()

View File

@@ -191,6 +191,33 @@ func TestInstanceFindByAlias(t *testing.T) {
assert.Nil(t, foundInstance) assert.Nil(t, foundInstance)
} }
func TestInstanceCreate(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "pwd")
if err != nil {
log.Fatal(err)
}
tmpfile.Close()
os.Remove(tmpfile.Name())
defer os.Remove(tmpfile.Name())
storage, err := NewFileStorage(tmpfile.Name())
assert.Nil(t, err)
i1 := &types.Instance{Name: "i1", Alias: "foo", IP: "10.0.0.1"}
s1 := &types.Session{Id: "session1"}
err = storage.SessionPut(s1)
assert.Nil(t, err)
err = storage.InstanceCreate(s1.Id, i1)
assert.Nil(t, err)
loadedSession, err := storage.SessionGet("session1")
assert.Nil(t, err)
assert.Equal(t, i1, loadedSession.Instances["i1"])
}
func TestCounts(t *testing.T) { func TestCounts(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "pwd") tmpfile, err := ioutil.TempFile("", "pwd")
if err != nil { if err != nil {
@@ -226,9 +253,6 @@ func TestCounts(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 2, num) assert.Equal(t, 2, num)
num, err = storage.ClientCount()
assert.Nil(t, err)
assert.Equal(t, 1, num)
} }
func TestSessionDelete(t *testing.T) { func TestSessionDelete(t *testing.T) {

View File

@@ -9,16 +9,17 @@ func NotFound(e error) bool {
} }
type StorageApi interface { type StorageApi interface {
SessionGet(sessionId string) (*types.Session, error) SessionGet(string) (*types.Session, error)
SessionPut(*types.Session) error SessionPut(*types.Session) error
SessionCount() (int, error) SessionCount() (int, error)
SessionDelete(sessionId string) error SessionDelete(string) error
InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error) InstanceFindByAlias(sessionPrefix, alias string) (*types.Instance, error)
// Should have the session id too, soon // Should have the session id too, soon
InstanceFindByIP(ip string) (*types.Instance, error) InstanceFindByIP(ip string) (*types.Instance, error)
InstanceFindByIPAndSession(sessionPrefix, ip string) (*types.Instance, error) InstanceFindByIPAndSession(sessionPrefix, ip string) (*types.Instance, error)
InstanceCount() (int, error) InstanceCreate(sessionId string, instance *types.Instance) error
InstanceDelete(sessionId, instanceName string) error
ClientCount() (int, error) InstanceCount() (int, error)
} }