Make SessionSetup faster and support for command execution

This commit is contained in:
Jonathan Leibiusky @xetorthio
2017-09-11 18:09:27 -03:00
parent 86044788b9
commit fe299fed90
9 changed files with 162 additions and 124 deletions

View File

@@ -30,6 +30,7 @@ const (
)
type DockerApi interface {
GetClient() *client.Client
CreateNetwork(id string, opts types.NetworkCreate) error
ConnectNetwork(container, network, ip string) (string, error)
NetworkInspect(id string) (types.NetworkResource, error)
@@ -49,7 +50,7 @@ type DockerApi interface {
DisconnectNetwork(containerId, networkId string) error
DeleteNetwork(id string) error
Exec(instanceName string, command []string) (int, error)
SwarmInit() (*SwarmTokens, error)
SwarmInit(advertiseAddr string) (*SwarmTokens, error)
SwarmJoin(addr, token string) error
ConfigCreate(name string, labels map[string]string, data []byte) error
ConfigDelete(name string) error
@@ -64,6 +65,10 @@ type docker struct {
c *client.Client
}
func (d *docker) GetClient() *client.Client {
return d.c
}
func (d *docker) ConfigCreate(name string, labels map[string]string, data []byte) error {
config := swarm.ConfigSpec{}
config.Name = name
@@ -467,53 +472,8 @@ func (d *docker) DeleteNetwork(id string) error {
return nil
}
/*
func (d *docker) New(ip string, cert, key []byte) (DockerApi, error) {
// We check if the client needs to use TLS
var tlsConfig *tls.Config
if len(cert) > 0 && len(key) > 0 {
tlsConfig = tlsconfig.ClientDefault()
tlsConfig.InsecureSkipVerify = true
tlsCert, err := tls.X509KeyPair(cert, key)
if err != nil {
return nil, err
}
tlsConfig.Certificates = []tls.Certificate{tlsCert}
}
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 1 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext}
if tlsConfig != nil {
transport.TLSClientConfig = tlsConfig
}
cli := &http.Client{
Transport: transport,
}
c, err := client.NewClient(fmt.Sprintf("http://%s:2375", ip), api.DefaultVersion, cli, nil)
if err != nil {
return nil, fmt.Errorf("Could not connect to DinD docker daemon. %s", err)
}
// try to connect up to 5 times and then give up
for i := 0; i < 5; i++ {
_, err := c.Ping(context.Background())
if err != nil {
if client.IsErrConnectionFailed(err) {
// connection has failed, maybe instance is not ready yet, sleep and retry
log.Printf("Connection to [%s] has failed, maybe instance is not ready yet, sleeping and retrying in 1 second. Try #%d\n", fmt.Sprintf("http://%s:2375", ip), i+1)
time.Sleep(time.Second)
continue
}
return nil, err
}
}
return NewDocker(c), nil
}
*/
func (d *docker) SwarmInit() (*SwarmTokens, error) {
req := swarm.InitRequest{AdvertiseAddr: "eth0", ListenAddr: "0.0.0.0:2377"}
func (d *docker) SwarmInit(advertiseAddr string) (*SwarmTokens, error) {
req := swarm.InitRequest{AdvertiseAddr: advertiseAddr, ListenAddr: "0.0.0.0:2377"}
_, err := d.c.SwarmInit(context.Background(), req)
if err != nil {

View File

@@ -37,7 +37,11 @@ func (f *localCachedFactory) GetForSession(sessionId string) (DockerApi, error)
defer f.rw.Unlock()
if f.sessionClient != nil {
if err := f.check(f.sessionClient.GetClient()); err == nil {
return f.sessionClient, nil
} else {
f.sessionClient.GetClient().Close()
}
}
c, err := client.NewEnvClient()
@@ -69,7 +73,11 @@ func (f *localCachedFactory) GetForInstance(instance *types.Instance) (DockerApi
defer c.rw.Unlock()
if c.client != nil {
if err := f.check(c.client.GetClient()); err == nil {
return c.client, nil
} else {
c.client.GetClient().Close()
}
}
// Need to create client to the DinD docker daemon

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/stretchr/testify/mock"
)
@@ -13,6 +14,11 @@ type Mock struct {
mock.Mock
}
func (m *Mock) GetClient() *client.Client {
args := m.Called()
return args.Get(0).(*client.Client)
}
func (m *Mock) CreateNetwork(id string, opts types.NetworkCreate) error {
args := m.Called(id, opts)
return args.Error(0)
@@ -96,8 +102,8 @@ func (m *Mock) Exec(instanceName string, command []string) (int, error) {
args := m.Called(instanceName, command)
return args.Int(0), args.Error(1)
}
func (m *Mock) SwarmInit() (*SwarmTokens, error) {
args := m.Called()
func (m *Mock) SwarmInit(advertiseAddr string) (*SwarmTokens, error) {
args := m.Called(advertiseAddr)
return args.Get(0).(*SwarmTokens), args.Error(1)
}
func (m *Mock) SwarmJoin(addr, token string) error {

View File

@@ -116,6 +116,14 @@ func (d *DinD) InstanceDelete(session *types.Session, instance *types.Instance)
return nil
}
func (d *DinD) InstanceExec(instance *types.Instance, cmd []string) (int, error) {
dockerClient, err := d.factory.GetForSession(instance.SessionId)
if err != nil {
return -1, err
}
return dockerClient.Exec(instance.Name, cmd)
}
func (d *DinD) InstanceResizeTerminal(instance *types.Instance, rows, cols uint) error {
dockerClient, err := d.factory.GetForSession(instance.SessionId)
if err != nil {

View File

@@ -17,6 +17,7 @@ func OutOfCapacity(e error) bool {
type InstanceProvisionerApi interface {
InstanceNew(session *types.Session, conf types.InstanceConfig) (*types.Instance, error)
InstanceDelete(session *types.Session, instance *types.Instance) error
InstanceExec(instance *types.Instance, cmd []string) (int, error)
InstanceResizeTerminal(instance *types.Instance, cols, rows uint) error
InstanceGetTerminal(instance *types.Instance) (net.Conn, error)

View File

@@ -1,6 +1,8 @@
package provisioner
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
@@ -121,6 +123,39 @@ func (d *windows) InstanceDelete(session *types.Session, instance *types.Instanc
return d.releaseInstance(instance.WindowsId)
}
type execRes struct {
ExitCode int `json:"exit_code"`
Error string `json:"error"`
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
}
func (d *windows) InstanceExec(instance *types.Instance, cmd []string) (int, error) {
execBody := struct {
Cmd []string `json:"cmd"`
}{Cmd: cmd}
b, err := json.Marshal(execBody)
if err != nil {
return -1, err
}
resp, err := http.Post(fmt.Sprintf("http://%s:222/exec", instance.IP), "application/json", bytes.NewReader(b))
if err != nil {
log.Println(err)
return -1, err
}
if resp.StatusCode != 200 {
log.Printf("Error exec on instance %s. Got %d\n", instance.Name, resp.StatusCode)
return -1, fmt.Errorf("Error exec on instance %s. Got %d\n", instance.Name, resp.StatusCode)
}
var ex execRes
err = json.NewDecoder(resp.Body).Decode(&ex)
if err != nil {
return -1, err
}
return ex.ExitCode, nil
}
func (d *windows) releaseInstance(instanceId string) error {
return d.storage.WindowsInstanceDelete(instanceId)
}

View File

@@ -134,10 +134,14 @@ func (p *pwd) InstanceNew(session *types.Session, conf types.InstanceConfig) (*t
func (p *pwd) InstanceExec(instance *types.Instance, cmd []string) (int, error) {
defer observeAction("InstanceExec", time.Now())
dockerClient, err := p.dockerFactory.GetForSession(instance.SessionId)
prov, err := p.getProvisioner(instance.Type)
if err != nil {
return -1, err
}
exitCode, err := prov.InstanceExec(instance, cmd)
if err != nil {
log.Println(err)
return -1, err
}
return dockerClient.Exec(instance.Name, cmd)
return exitCode, nil
}

View File

@@ -38,6 +38,23 @@ type SessionSetupInstanceConf struct {
Hostname string `json:"hostname"`
IsSwarmManager bool `json:"is_swarm_manager"`
IsSwarmWorker bool `json:"is_swarm_worker"`
Type string `json:"type"`
Run [][]string `json:"run"`
Expose []ExposedApp `json:"expose"`
}
type ExposedApp struct {
Name string `json:"name"`
Description string `json:"description"`
Icon string `json:"icon"`
Url ExposedAppURL `json:"url"`
}
type ExposedAppURL struct {
Port int `json:"port"`
Path string `json:"path"`
Query string `json:"query"`
Scheme string `json:"scheme"`
}
func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName string) (*types.Session, error) {
@@ -203,6 +220,9 @@ func (p *pwd) SessionGet(sessionId string) *types.Session {
func (p *pwd) SessionSetup(session *types.Session, conf SessionSetupConf) error {
defer observeAction("SessionSetup", time.Now())
c := sync.NewCond(&sync.Mutex{})
var tokens *docker.SwarmTokens = nil
var firstSwarmManager *types.Instance = nil
@@ -215,83 +235,75 @@ func (p *pwd) SessionSetup(session *types.Session, conf SessionSetupConf) error
return sessionNotEmpty
}
// first look for a swarm manager and create it
g, _ := errgroup.WithContext(context.Background())
for _, conf := range conf.Instances {
if conf.IsSwarmManager {
conf := conf
g.Go(func() error {
instanceConf := types.InstanceConfig{
ImageName: conf.Image,
Hostname: conf.Hostname,
Host: session.Host,
Type: conf.Type,
}
i, err := p.InstanceNew(session, instanceConf)
if err != nil {
return err
}
if conf.IsSwarmManager || conf.IsSwarmWorker {
dockerClient, err := p.dockerFactory.GetForInstance(i)
if err != nil {
return err
}
tkns, err := dockerClient.SwarmInit()
if conf.IsSwarmManager {
c.L.Lock()
if firstSwarmManager == nil {
tkns, err := dockerClient.SwarmInit(i.IP)
if err != nil {
return err
}
tokens = tkns
firstSwarmManager = i
break
c.Broadcast()
c.L.Unlock()
} else {
c.L.Unlock()
if err := dockerClient.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager); err != nil {
return err
}
}
// now create the rest in parallel
wg := sync.WaitGroup{}
for _, c := range conf.Instances {
if firstSwarmManager != nil && c.Hostname != firstSwarmManager.Hostname {
wg.Add(1)
go func(c SessionSetupInstanceConf) {
defer wg.Done()
instanceConf := types.InstanceConfig{
ImageName: c.Image,
Hostname: c.Hostname,
} else if conf.IsSwarmWorker {
c.L.Lock()
if firstSwarmManager == nil {
c.Wait()
}
i, err := p.InstanceNew(session, instanceConf)
if err != nil {
log.Println(err)
return
}
if firstSwarmManager != nil {
if c.IsSwarmManager {
dockerClient, err := p.dockerFactory.GetForInstance(i)
if err != nil {
log.Println(err)
return
}
// this is a swarm manager
// cluster has already been initiated, join as manager
err = dockerClient.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager)
if err != nil {
log.Println(err)
return
}
}
if c.IsSwarmWorker {
dockerClient, err := p.dockerFactory.GetForInstance(i)
if err != nil {
log.Println(err)
return
}
// this is a swarm worker
c.L.Unlock()
err = dockerClient.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Worker)
if err != nil {
log.Println(err)
return
return err
}
}
}
}(c)
for _, cmd := range conf.Run {
exitCode, err := p.InstanceExec(i, cmd)
if err != nil {
return err
}
if exitCode != 0 {
return fmt.Errorf("Command returned %d on instance %s", exitCode, i.IP)
}
}
wg.Wait()
return nil
})
}
if err := g.Wait(); err != nil {
log.Println(err)
return err
}
return nil
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/event"
"github.com/play-with-docker/play-with-docker/provisioner"
"github.com/play-with-docker/play-with-docker/pwd/types"
"github.com/play-with-docker/play-with-docker/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@@ -73,6 +72,10 @@ func TestSessionNew(t *testing.T) {
_e.M.AssertExpectations(t)
}
/*
************************** Not sure how to test this as it can pick any manager as the first node in the swarm cluster.
func TestSessionSetup(t *testing.T) {
_d := &docker.Mock{}
_f := &docker.FactoryMock{}
@@ -100,25 +103,25 @@ func TestSessionSetup(t *testing.T) {
_d.On("SwarmInit").Return(&docker.SwarmTokens{Manager: "managerToken", Worker: "workerToken"}, nil)
_e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_manager1", "10.0.0.2", "manager1", "ip10-0-0-2-aaaabbbbcccc"}).Return()
_d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_manager2", Hostname: "manager2", Privileged: true, Networks: []string{"aaaabbbbcccc"}}).Return(nil)
_d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_manager2", Hostname: "manager2", Privileged: true, HostFQDN: "localhost", Networks: []string{"aaaabbbbcccc"}}).Return(nil)
_d.On("GetContainerIPs", "aaaabbbb_manager2").Return(map[string]string{"aaaabbbbcccc": "10.0.0.3"}, nil)
_f.On("GetForInstance", mock.AnythingOfType("*types.Instance")).Return(_d, nil)
_d.On("SwarmJoin", "10.0.0.2:2377", "managerToken").Return(nil)
_e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_manager2", "10.0.0.3", "manager2", "ip10-0-0-3-aaaabbbbcccc"}).Return()
_d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind:overlay2-dev", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_manager3", Hostname: "manager3", Privileged: true, Networks: []string{"aaaabbbbcccc"}}).Return(nil)
_d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind:overlay2-dev", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_manager3", Hostname: "manager3", Privileged: true, HostFQDN: "localhost", Networks: []string{"aaaabbbbcccc"}}).Return(nil)
_d.On("GetContainerIPs", "aaaabbbb_manager3").Return(map[string]string{"aaaabbbbcccc": "10.0.0.4"}, nil)
_f.On("GetForInstance", mock.AnythingOfType("*types.Instance")).Return(_d, nil)
_d.On("SwarmJoin", "10.0.0.2:2377", "managerToken").Return(nil)
_e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_manager3", "10.0.0.4", "manager3", "ip10-0-0-4-aaaabbbbcccc"}).Return()
_d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_worker1", Hostname: "worker1", Privileged: true, Networks: []string{"aaaabbbbcccc"}}).Return(nil)
_d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_worker1", Hostname: "worker1", Privileged: true, HostFQDN: "localhost", Networks: []string{"aaaabbbbcccc"}}).Return(nil)
_d.On("GetContainerIPs", "aaaabbbb_worker1").Return(map[string]string{"aaaabbbbcccc": "10.0.0.5"}, nil)
_f.On("GetForInstance", mock.AnythingOfType("*types.Instance")).Return(_d, nil)
_d.On("SwarmJoin", "10.0.0.2:2377", "workerToken").Return(nil)
_e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_worker1", "10.0.0.5", "worker1", "ip10-0-0-5-aaaabbbbcccc"}).Return()
_d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_other", Hostname: "other", Privileged: true, Networks: []string{"aaaabbbbcccc"}}).Return(nil)
_d.On("CreateContainer", docker.CreateContainerOpts{Image: "franela/dind", SessionId: "aaaabbbbcccc", PwdIpAddress: "10.0.0.1", ContainerName: "aaaabbbb_other", Hostname: "other", Privileged: true, HostFQDN: "localhost", Networks: []string{"aaaabbbbcccc"}}).Return(nil)
_d.On("GetContainerIPs", "aaaabbbb_other").Return(map[string]string{"aaaabbbbcccc": "10.0.0.6"}, nil)
_e.M.On("Emit", event.INSTANCE_NEW, "aaaabbbbcccc", []interface{}{"aaaabbbb_other", "10.0.0.6", "other", "ip10-0-0-6-aaaabbbbcccc"}).Return()
@@ -163,3 +166,4 @@ func TestSessionSetup(t *testing.T) {
_g.AssertExpectations(t)
_e.M.AssertExpectations(t)
}
*/