From 33febafb435d3bbc272f176eba8ad498d8bac4a7 Mon Sep 17 00:00:00 2001 From: "Jonathan Leibiusky @xetorthio" Date: Sat, 10 Jun 2017 18:18:30 -0300 Subject: [PATCH] Load sessions in parallel --- handlers/bootstrap.go | 5 +++ pwd/docker_mock_test.go | 20 +++++---- pwd/session.go | 27 +++++++---- pwd/session_test.go | 99 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 134 insertions(+), 17 deletions(-) diff --git a/handlers/bootstrap.go b/handlers/bootstrap.go index 08e8205..cb8252f 100644 --- a/handlers/bootstrap.go +++ b/handlers/bootstrap.go @@ -3,6 +3,7 @@ package handlers import ( "log" "os" + "time" "github.com/docker/docker/client" "github.com/play-with-docker/play-with-docker/docker" @@ -31,7 +32,11 @@ func Bootstrap() { core = pwd.NewPWD(d, t, Broadcast, s) + loadStart := time.Now() err = core.SessionLoadAndPrepare() + loadElapsed := time.Since(loadStart) + log.Printf("***************** Loading stored sessions took %s\n", loadElapsed) + if err != nil && !os.IsNotExist(err) { log.Fatal("Error decoding sessions from disk ", err) } diff --git a/pwd/docker_mock_test.go b/pwd/docker_mock_test.go index faa05ff..6bf8494 100644 --- a/pwd/docker_mock_test.go +++ b/pwd/docker_mock_test.go @@ -10,14 +10,15 @@ import ( ) type mockDocker struct { - createNetwork func(string) error - connectNetwork func(container, network, ip string) (string, error) - containerResize func(string, uint, uint) error - createContainer func(opts docker.CreateContainerOpts) (string, error) - execAttach func(instanceName string, command []string, out io.Writer) (int, error) - new func(ip string, cert, key []byte) (docker.DockerApi, error) - swarmInit func() (*docker.SwarmTokens, error) - swarmJoin func(addr, token string) error + createNetwork func(string) error + connectNetwork func(container, network, ip string) (string, error) + containerResize func(string, uint, uint) error + createContainer func(opts docker.CreateContainerOpts) (string, error) + execAttach func(instanceName string, command []string, out io.Writer) (int, error) + new func(ip string, cert, key []byte) (docker.DockerApi, error) + swarmInit func() (*docker.SwarmTokens, error) + swarmJoin func(addr, token string) error + createAttachConnection func(name string) (net.Conn, error) } func (m *mockDocker) CreateNetwork(id string) error { @@ -53,6 +54,9 @@ func (m *mockDocker) ContainerResize(name string, rows, cols uint) error { return nil } func (m *mockDocker) CreateAttachConnection(name string) (net.Conn, error) { + if m.createAttachConnection != nil { + return m.createAttachConnection(name) + } return &mockConn{}, nil } func (m *mockDocker) CopyToContainer(containerName, destination, fileName string, content io.Reader) error { diff --git a/pwd/session.go b/pwd/session.go index a6e596c..4cec7b1 100644 --- a/pwd/session.go +++ b/pwd/session.go @@ -197,22 +197,31 @@ func (p *pwd) SessionLoadAndPrepare() error { return err } + wg := sync.WaitGroup{} for _, s := range sessions { - err := p.prepareSession(s) - if err != nil { - return err - } - for _, i := range s.Instances { - // wire the session back to the instance - i.session = s - go p.InstanceAttachTerminal(i) - } // Connect PWD daemon to the new network if s.PwdIpAddress == "" { return fmt.Errorf("Cannot load stored sessions as they don't have the pwd ip address stored with them") } + wg.Add(1) + go func(s *Session) { + s.rw.Lock() + defer s.rw.Unlock() + defer wg.Done() + + err := p.prepareSession(s) + if err != nil { + log.Println(err) + } + for _, i := range s.Instances { + // wire the session back to the instance + i.session = s + go p.InstanceAttachTerminal(i) + } + }(s) } + wg.Wait() setGauges() return nil diff --git a/pwd/session_test.go b/pwd/session_test.go index 9533e2c..678495c 100644 --- a/pwd/session_test.go +++ b/pwd/session_test.go @@ -2,6 +2,8 @@ package pwd import ( "fmt" + "net" + "sync" "testing" "time" @@ -278,3 +280,100 @@ func TestSessionSetup(t *testing.T) { assert.True(t, manager3JoinedHasManager) assert.True(t, worker1JoinedHasWorker) } + +func TestSessionLoadAndPrepare(t *testing.T) { + config.PWDContainerName = "pwd" + lock := sync.Mutex{} + var s1NetworkConnect []string + var s2NetworkConnect []string + + wg := sync.WaitGroup{} + wg.Add(3) + connectedInstances := []string{} + sessions = map[string]*Session{} + i1 := &Instance{ + Image: "dind", + Name: "session1_i1", + Hostname: "i1", + IP: "10.0.0.10", + IsDockerHost: true, + } + i2 := &Instance{ + Image: "dind", + Name: "session1_i2", + Hostname: "i1", + IP: "10.0.0.11", + IsDockerHost: true, + } + i3 := &Instance{ + Image: "dind", + Name: "session1_i3", + Hostname: "i1", + IP: "10.0.0.12", + IsDockerHost: true, + } + s1 := &Session{ + Id: "session1", + Instances: map[string]*Instance{"session1_i1": i1}, + CreatedAt: time.Now(), + ExpiresAt: time.Now().Add(time.Hour), + PwdIpAddress: "10.0.0.1", + Ready: true, + Stack: "", + StackName: "", + } + s2 := &Session{ + Id: "session2", + Instances: map[string]*Instance{"session1_i2": i2, "session1_i3": i3}, + CreatedAt: time.Now(), + ExpiresAt: time.Now().Add(time.Hour), + PwdIpAddress: "10.0.0.2", + Ready: true, + Stack: "", + StackName: "", + } + + dock := &mockDocker{} + dock.createAttachConnection = func(instanceName string) (net.Conn, error) { + lock.Lock() + defer lock.Unlock() + connectedInstances = append(connectedInstances, instanceName) + wg.Done() + return &mockConn{}, nil + } + dock.connectNetwork = func(container, network, ip string) (string, error) { + if s1.Id == network { + s1NetworkConnect = []string{container, network, ip} + } else if s2.Id == network { + s2NetworkConnect = []string{container, network, ip} + } + return ip, nil + } + tasks := &mockTasks{} + tasks.schedule = func(s *Session) { + s.ticker = time.NewTicker(1 * time.Second) + } + broadcast := &mockBroadcast{} + storage := &mockStorage{} + + storage.load = func() error { + sessions = map[string]*Session{"session1": s1, "session2": s2} + return nil + } + + p := NewPWD(dock, tasks, broadcast, storage) + + err := p.SessionLoadAndPrepare() + assert.Nil(t, err) + assert.Len(t, sessions, 2) + assert.NotNil(t, s1.closingTimer) + assert.NotNil(t, s2.closingTimer) + assert.NotNil(t, s1.ticker) + assert.NotNil(t, s2.ticker) + + assert.Equal(t, []string{"pwd", s1.Id, s1.PwdIpAddress}, s1NetworkConnect) + assert.Equal(t, []string{"pwd", s2.Id, s2.PwdIpAddress}, s2NetworkConnect) + + wg.Wait() + assert.Subset(t, connectedInstances, []string{i1.Name, i2.Name, i3.Name}) +}