Load sessions in parallel

This commit is contained in:
Jonathan Leibiusky @xetorthio
2017-06-10 18:18:30 -03:00
parent a95d1cd08d
commit 33febafb43
4 changed files with 134 additions and 17 deletions

View File

@@ -3,6 +3,7 @@ package handlers
import ( import (
"log" "log"
"os" "os"
"time"
"github.com/docker/docker/client" "github.com/docker/docker/client"
"github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/docker"
@@ -31,7 +32,11 @@ func Bootstrap() {
core = pwd.NewPWD(d, t, Broadcast, s) core = pwd.NewPWD(d, t, Broadcast, s)
loadStart := time.Now()
err = core.SessionLoadAndPrepare() err = core.SessionLoadAndPrepare()
loadElapsed := time.Since(loadStart)
log.Printf("***************** Loading stored sessions took %s\n", loadElapsed)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
log.Fatal("Error decoding sessions from disk ", err) log.Fatal("Error decoding sessions from disk ", err)
} }

View File

@@ -10,14 +10,15 @@ import (
) )
type mockDocker struct { type mockDocker struct {
createNetwork func(string) error createNetwork func(string) error
connectNetwork func(container, network, ip string) (string, error) connectNetwork func(container, network, ip string) (string, error)
containerResize func(string, uint, uint) error containerResize func(string, uint, uint) error
createContainer func(opts docker.CreateContainerOpts) (string, error) createContainer func(opts docker.CreateContainerOpts) (string, error)
execAttach func(instanceName string, command []string, out io.Writer) (int, error) execAttach func(instanceName string, command []string, out io.Writer) (int, error)
new func(ip string, cert, key []byte) (docker.DockerApi, error) new func(ip string, cert, key []byte) (docker.DockerApi, error)
swarmInit func() (*docker.SwarmTokens, error) swarmInit func() (*docker.SwarmTokens, error)
swarmJoin func(addr, token string) error swarmJoin func(addr, token string) error
createAttachConnection func(name string) (net.Conn, error)
} }
func (m *mockDocker) CreateNetwork(id string) error { func (m *mockDocker) CreateNetwork(id string) error {
@@ -53,6 +54,9 @@ func (m *mockDocker) ContainerResize(name string, rows, cols uint) error {
return nil return nil
} }
func (m *mockDocker) CreateAttachConnection(name string) (net.Conn, error) { func (m *mockDocker) CreateAttachConnection(name string) (net.Conn, error) {
if m.createAttachConnection != nil {
return m.createAttachConnection(name)
}
return &mockConn{}, nil return &mockConn{}, nil
} }
func (m *mockDocker) CopyToContainer(containerName, destination, fileName string, content io.Reader) error { func (m *mockDocker) CopyToContainer(containerName, destination, fileName string, content io.Reader) error {

View File

@@ -197,22 +197,31 @@ func (p *pwd) SessionLoadAndPrepare() error {
return err return err
} }
wg := sync.WaitGroup{}
for _, s := range sessions { for _, s := range sessions {
err := p.prepareSession(s)
if err != nil {
return err
}
for _, i := range s.Instances {
// wire the session back to the instance
i.session = s
go p.InstanceAttachTerminal(i)
}
// Connect PWD daemon to the new network // Connect PWD daemon to the new network
if s.PwdIpAddress == "" { if s.PwdIpAddress == "" {
return fmt.Errorf("Cannot load stored sessions as they don't have the pwd ip address stored with them") return fmt.Errorf("Cannot load stored sessions as they don't have the pwd ip address stored with them")
} }
wg.Add(1)
go func(s *Session) {
s.rw.Lock()
defer s.rw.Unlock()
defer wg.Done()
err := p.prepareSession(s)
if err != nil {
log.Println(err)
}
for _, i := range s.Instances {
// wire the session back to the instance
i.session = s
go p.InstanceAttachTerminal(i)
}
}(s)
} }
wg.Wait()
setGauges() setGauges()
return nil return nil

View File

@@ -2,6 +2,8 @@ package pwd
import ( import (
"fmt" "fmt"
"net"
"sync"
"testing" "testing"
"time" "time"
@@ -278,3 +280,100 @@ func TestSessionSetup(t *testing.T) {
assert.True(t, manager3JoinedHasManager) assert.True(t, manager3JoinedHasManager)
assert.True(t, worker1JoinedHasWorker) assert.True(t, worker1JoinedHasWorker)
} }
func TestSessionLoadAndPrepare(t *testing.T) {
config.PWDContainerName = "pwd"
lock := sync.Mutex{}
var s1NetworkConnect []string
var s2NetworkConnect []string
wg := sync.WaitGroup{}
wg.Add(3)
connectedInstances := []string{}
sessions = map[string]*Session{}
i1 := &Instance{
Image: "dind",
Name: "session1_i1",
Hostname: "i1",
IP: "10.0.0.10",
IsDockerHost: true,
}
i2 := &Instance{
Image: "dind",
Name: "session1_i2",
Hostname: "i1",
IP: "10.0.0.11",
IsDockerHost: true,
}
i3 := &Instance{
Image: "dind",
Name: "session1_i3",
Hostname: "i1",
IP: "10.0.0.12",
IsDockerHost: true,
}
s1 := &Session{
Id: "session1",
Instances: map[string]*Instance{"session1_i1": i1},
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(time.Hour),
PwdIpAddress: "10.0.0.1",
Ready: true,
Stack: "",
StackName: "",
}
s2 := &Session{
Id: "session2",
Instances: map[string]*Instance{"session1_i2": i2, "session1_i3": i3},
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(time.Hour),
PwdIpAddress: "10.0.0.2",
Ready: true,
Stack: "",
StackName: "",
}
dock := &mockDocker{}
dock.createAttachConnection = func(instanceName string) (net.Conn, error) {
lock.Lock()
defer lock.Unlock()
connectedInstances = append(connectedInstances, instanceName)
wg.Done()
return &mockConn{}, nil
}
dock.connectNetwork = func(container, network, ip string) (string, error) {
if s1.Id == network {
s1NetworkConnect = []string{container, network, ip}
} else if s2.Id == network {
s2NetworkConnect = []string{container, network, ip}
}
return ip, nil
}
tasks := &mockTasks{}
tasks.schedule = func(s *Session) {
s.ticker = time.NewTicker(1 * time.Second)
}
broadcast := &mockBroadcast{}
storage := &mockStorage{}
storage.load = func() error {
sessions = map[string]*Session{"session1": s1, "session2": s2}
return nil
}
p := NewPWD(dock, tasks, broadcast, storage)
err := p.SessionLoadAndPrepare()
assert.Nil(t, err)
assert.Len(t, sessions, 2)
assert.NotNil(t, s1.closingTimer)
assert.NotNil(t, s2.closingTimer)
assert.NotNil(t, s1.ticker)
assert.NotNil(t, s2.ticker)
assert.Equal(t, []string{"pwd", s1.Id, s1.PwdIpAddress}, s1NetworkConnect)
assert.Equal(t, []string{"pwd", s2.Id, s2.PwdIpAddress}, s2NetworkConnect)
wg.Wait()
assert.Subset(t, connectedInstances, []string{i1.Name, i2.Name, i3.Name})
}