From ed7cefcf9cdaf70d7ce505ce8e2acddf1f565310 Mon Sep 17 00:00:00 2001 From: Marcos Lilljedahl Date: Fri, 4 Aug 2017 21:54:03 -0300 Subject: [PATCH] Add first version of the windows ASG provider --- config/config.go | 10 ++ docker/docker.go | 23 +++-- provisioner/windows.go | 212 +++++++++++++++++++++++++++++++++++++++-- pwd/pwd.go | 2 +- pwd/types/instance.go | 6 ++ pwd/types/session.go | 25 ++--- storage/file.go | 46 +++++++++ storage/file_test.go | 75 +++++++++++++++ storage/mock.go | 15 +++ storage/storage.go | 3 + 10 files changed, 388 insertions(+), 29 deletions(-) diff --git a/config/config.go b/config/config.go index b43bc82..7de0c62 100644 --- a/config/config.go +++ b/config/config.go @@ -46,6 +46,16 @@ func GetDindImageName() string { } return dindImage } + +func GetSSHImage() string { + sshImage := os.Getenv("SSH_IMAGE") + defaultSSHImage := "franela/ssh" + if len(sshImage) == 0 { + return defaultSSHImage + } + return sshImage +} + func GetDuration(reqDur string) time.Duration { var defaultDuration = 4 * time.Hour if reqDur != "" { diff --git a/docker/docker.go b/docker/docker.go index 054874e..f1a2a50 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -192,16 +192,17 @@ func (d *docker) DeleteContainer(id string) error { } type CreateContainerOpts struct { - Image string - SessionId string - PwdIpAddress string - ContainerName string - Hostname string - ServerCert []byte - ServerKey []byte - CACert []byte - Privileged bool - HostFQDN string + Image string + WindowsEndpoint string + SessionId string + PwdIpAddress string + ContainerName string + Hostname string + ServerCert []byte + ServerKey []byte + CACert []byte + Privileged bool + HostFQDN string } func (d *docker) CreateContainer(opts CreateContainerOpts) (string, error) { @@ -211,6 +212,8 @@ func (d *docker) CreateContainer(opts CreateContainerOpts) (string, error) { env := []string{} + env = append(env, fmt.Sprintf("WINDOWS_ENDPOINT=%s", opts.WindowsEndpoint)) + // Write certs to container cert dir if len(opts.ServerCert) > 0 { env = append(env, `DOCKER_TLSCERT=\/var\/run\/pwd\/certs\/cert.pem`) diff --git a/provisioner/windows.go b/provisioner/windows.go index 30c29f4..18b8b81 100644 --- a/provisioner/windows.go +++ b/provisioner/windows.go @@ -1,36 +1,151 @@ package provisioner import ( + "errors" "fmt" "io" "net" + "strings" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/docker" "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/play-with-docker/play-with-docker/router" + "github.com/play-with-docker/play-with-docker/storage" ) +var asgService *autoscaling.AutoScaling +var ec2Service *ec2.EC2 + +func init() { + // Create a session to share configuration, and load external configuration. + sess := session.Must(session.NewSession()) + // + // // Create the service's client with the session. + asgService = autoscaling.New(sess) + ec2Service = ec2.New(sess) +} + type windows struct { factory docker.FactoryApi + storage storage.StorageApi } -func NewWindows(f docker.FactoryApi) *windows { - return &windows{factory: f} +type instanceInfo struct { + publicIP string + privateIP string + id string +} + +func NewWindowsASG(f docker.FactoryApi, st storage.StorageApi) *windows { + return &windows{factory: f, storage: st} } func (d *windows) InstanceNew(session *types.Session, conf types.InstanceConfig) (*types.Instance, error) { - return nil, fmt.Errorf("Not implemented") + + conf.ImageName = config.GetSSHImage() + + winfo, err := d.getWindowsInstanceInfo(session.Id) + if err != nil { + return nil, err + } + + if conf.Hostname == "" { + var nodeName string + for i := 1; ; i++ { + nodeName = fmt.Sprintf("node%d", i) + exists := checkHostnameExists(session, nodeName) + if !exists { + break + } + } + conf.Hostname = nodeName + } + containerName := fmt.Sprintf("%s_%s", session.Id[:8], conf.Hostname) + opts := docker.CreateContainerOpts{ + Image: conf.ImageName, + WindowsEndpoint: winfo.publicIP, + SessionId: session.Id, + PwdIpAddress: session.PwdIpAddress, + ContainerName: containerName, + Hostname: conf.Hostname, + ServerCert: conf.ServerCert, + ServerKey: conf.ServerKey, + CACert: conf.CACert, + Privileged: false, + HostFQDN: conf.Host, + } + + dockerClient, err := d.factory.GetForSession(session.Id) + if err != nil { + return nil, err + } + ip, err := dockerClient.CreateContainer(opts) + if err != nil { + return nil, err + } + + instance := &types.Instance{} + instance.Image = opts.Image + instance.IP = winfo.privateIP + instance.SessionId = session.Id + instance.Name = containerName + instance.WindowsId = winfo.id + instance.Hostname = conf.Hostname + instance.Cert = conf.Cert + instance.Key = conf.Key + instance.Type = conf.Type + instance.ServerCert = conf.ServerCert + instance.ServerKey = conf.ServerKey + instance.CACert = conf.CACert + instance.Session = session + instance.ProxyHost = router.EncodeHost(session.Id, ip, router.HostOpts{}) + instance.SessionHost = session.Host + // For now this condition holds through. In the future we might need a more complex logic. + instance.IsDockerHost = opts.Privileged + + return instance, nil + } func (d *windows) InstanceDelete(session *types.Session, instance *types.Instance) error { - return fmt.Errorf("Not implemented") + dockerClient, err := d.factory.GetForSession(session.Id) + if err != nil { + return err + } + err = dockerClient.DeleteContainer(instance.Name) + if err != nil && !strings.Contains(err.Error(), "No such container") { + return err + } + + err = d.storage.InstanceDeleteWindows(session.Id, instance.WindowsId) + if err != nil { + return err + } + + // TODO trigger deletion in AWS + + return nil } func (d *windows) InstanceResizeTerminal(instance *types.Instance, cols, rows uint) error { - return fmt.Errorf("Not implemented") + dockerClient, err := d.factory.GetForSession(instance.SessionId) + if err != nil { + return err + } + return dockerClient.ContainerResize(instance.Name, rows, cols) } func (d *windows) InstanceGetTerminal(instance *types.Instance) (net.Conn, error) { - return nil, fmt.Errorf("Not implemented") + dockerClient, err := d.factory.GetForSession(instance.SessionId) + if err != nil { + return nil, err + } + return dockerClient.CreateAttachConnection(instance.Name) } func (d *windows) InstanceUploadFromUrl(instance *types.Instance, fileName, dest, url string) error { @@ -40,3 +155,88 @@ func (d *windows) InstanceUploadFromUrl(instance *types.Instance, fileName, dest func (d *windows) InstanceUploadFromReader(instance *types.Instance, fileName, dest string, reader io.Reader) error { return fmt.Errorf("Not implemented") } + +func (d *windows) getWindowsInstanceInfo(sessionId string) (*instanceInfo, error) { + + input := &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: []*string{aws.String("pwd-windows")}, + } + out, err := asgService.DescribeAutoScalingGroups(input) + + if err != nil { + return nil, err + } + + // there should always be one asg + instances := out.AutoScalingGroups[0].Instances + availInstances := make([]string, len(instances)) + + for i, inst := range instances { + availInstances[i] = *inst.InstanceId + } + + assignedInstances, err := d.storage.InstanceGetAllWindows() + assignedInstancesIds := []string{} + for _, ai := range assignedInstances { + assignedInstancesIds = append(assignedInstancesIds, ai.ID) + } + + if err != nil { + return nil, err + } + + avInstanceId := d.pickFreeInstance(sessionId, availInstances, assignedInstancesIds) + + if len(avInstanceId) == 0 { + return nil, errors.New("No Windows instance available") + } + + iout, err := ec2Service.DescribeInstances(&ec2.DescribeInstancesInput{ + InstanceIds: []*string{aws.String(avInstanceId)}, + }) + if err != nil { + // TODO retry x times and free the instance that was picked? + return nil, err + } + + instance := iout.Reservations[0].Instances[0] + + instanceInfo := &instanceInfo{ + publicIP: *instance.PublicIpAddress, + privateIP: *instance.PrivateIpAddress, + id: avInstanceId, + } + + //TODO check for free instance, ASG capacity and return + + return instanceInfo, nil +} + +// select free instance and lock it into db. +// additionally check if ASG needs to be resized +func (d *windows) pickFreeInstance(sessionId string, availInstances, assignedInstances []string) string { + + for _, av := range availInstances { + found := false + for _, as := range assignedInstances { + if av == as { + found = true + break + } + + } + + if !found { + fmt.Println("ABOUT TO PERSIST", av) + err := d.storage.InstanceCreateWindows(&types.WindowsInstance{SessionId: sessionId, ID: av}) + if err != nil { + // TODO either storage error or instance is already assigned (race condition) + } + return av + } + } + + // all availalbe instances are assigned + return "" + +} diff --git a/pwd/pwd.go b/pwd/pwd.go index 72d777a..4329092 100644 --- a/pwd/pwd.go +++ b/pwd/pwd.go @@ -103,7 +103,7 @@ type PWDApi interface { } func NewPWD(f docker.FactoryApi, e event.EventApi, s storage.StorageApi) *pwd { - return &pwd{dockerFactory: f, event: e, storage: s, generator: xidGenerator{}, windowsProvisioner: provisioner.NewWindows(f), dindProvisioner: provisioner.NewDinD(f)} + return &pwd{dockerFactory: f, event: e, storage: s, generator: xidGenerator{}, windowsProvisioner: provisioner.NewWindowsASG(f, s), dindProvisioner: provisioner.NewDinD(f)} } func (p *pwd) getProvisioner(t string) (provisioner.ProvisionerApi, error) { diff --git a/pwd/types/instance.go b/pwd/types/instance.go index 88d1299..fc6eb8b 100644 --- a/pwd/types/instance.go +++ b/pwd/types/instance.go @@ -19,6 +19,12 @@ type Instance struct { Type string `json:"type" bson:"type"` Session *Session `json:"-" bson:"-"` ctx context.Context `json:"-" bson:"-"` + WindowsId string `json:"-" bson:"windows_id"` +} + +type WindowsInstance struct { + ID string `bson:"id"` + SessionId string `bson:"session_id"` } type InstanceConfig struct { diff --git a/pwd/types/session.go b/pwd/types/session.go index c7ac55b..944f799 100644 --- a/pwd/types/session.go +++ b/pwd/types/session.go @@ -6,18 +6,19 @@ import ( ) type Session struct { - Id string `json:"id"` - Instances map[string]*Instance `json:"instances" bson:"-"` - CreatedAt time.Time `json:"created_at"` - ExpiresAt time.Time `json:"expires_at"` - PwdIpAddress string `json:"pwd_ip_address"` - Ready bool `json:"ready"` - Stack string `json:"stack"` - StackName string `json:"stack_name"` - ImageName string `json:"image_name"` - Host string `json:"host"` - Clients []*Client `json:"-" bson:"-"` - rw sync.Mutex `json:"-"` + Id string `json:"id"` + Instances map[string]*Instance `json:"instances" bson:"-"` + CreatedAt time.Time `json:"created_at"` + ExpiresAt time.Time `json:"expires_at"` + PwdIpAddress string `json:"pwd_ip_address"` + Ready bool `json:"ready"` + Stack string `json:"stack"` + StackName string `json:"stack_name"` + ImageName string `json:"image_name"` + Host string `json:"host"` + Clients []*Client `json:"-" bson:"-"` + WindowsAssigned []*WindowsInstance `json:"-" bson:"-"` + rw sync.Mutex `json:"-"` } func (s *Session) Lock() { diff --git a/storage/file.go b/storage/file.go index 0187adb..c1c75b3 100644 --- a/storage/file.go +++ b/storage/file.go @@ -48,6 +48,19 @@ func (store *storage) SessionPut(s *types.Session) error { return store.save() } +func (store *storage) InstanceGetAllWindows() ([]*types.WindowsInstance, error) { + store.rw.Lock() + defer store.rw.Unlock() + + instances := []*types.WindowsInstance{} + + for _, s := range store.db { + instances = append(instances, s.WindowsAssigned...) + } + + return instances, nil +} + func (store *storage) InstanceGet(sessionId, name string) (*types.Instance, error) { store.rw.Lock() defer store.rw.Unlock() @@ -94,6 +107,20 @@ func (store *storage) InstanceCreate(sessionId string, instance *types.Instance) return store.save() } +func (store *storage) InstanceCreateWindows(instance *types.WindowsInstance) error { + store.rw.Lock() + defer store.rw.Unlock() + + s, found := store.db[instance.SessionId] + if !found { + return fmt.Errorf("Session %s", notFound) + } + + s.WindowsAssigned = append(s.WindowsAssigned, instance) + + return store.save() +} + func (store *storage) InstanceDelete(sessionId, name string) error { store.rw.Lock() defer store.rw.Unlock() @@ -111,6 +138,25 @@ func (store *storage) InstanceDelete(sessionId, name string) error { return store.save() } +func (store *storage) InstanceDeleteWindows(sessionId, id string) error { + store.rw.Lock() + defer store.rw.Unlock() + + s, found := store.db[sessionId] + if !found { + return fmt.Errorf("Session %s", notFound) + } + + for i, winst := range s.WindowsAssigned { + if winst.ID == id { + s.WindowsAssigned = append(s.WindowsAssigned[:i], s.WindowsAssigned[i+1:]...) + } + + } + + return store.save() +} + func (store *storage) SessionCount() (int, error) { store.rw.Lock() defer store.rw.Unlock() diff --git a/storage/file_test.go b/storage/file_test.go index 28a5e3e..aeda53d 100644 --- a/storage/file_test.go +++ b/storage/file_test.go @@ -168,6 +168,31 @@ func TestInstanceGet(t *testing.T) { assert.Equal(t, i1, foundInstance) } +func TestInstanceGetAllWindows(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "pwd") + if err != nil { + log.Fatal(err) + } + tmpfile.Close() + os.Remove(tmpfile.Name()) + defer os.Remove(tmpfile.Name()) + + storage, err := NewFileStorage(tmpfile.Name()) + + assert.Nil(t, err) + w1 := []*types.WindowsInstance{{ID: "one"}, {ID: "two"}} + w2 := []*types.WindowsInstance{{ID: "three"}, {ID: "four"}} + s1 := &types.Session{Id: "session1", WindowsAssigned: w1} + s2 := &types.Session{Id: "session2", WindowsAssigned: w2} + err = storage.SessionPut(s1) + err = storage.SessionPut(s2) + assert.Nil(t, err) + + allw, err := storage.InstanceGetAllWindows() + assert.Nil(t, err) + assert.Equal(t, allw, append(w1, w2...)) +} + func TestInstanceCreate(t *testing.T) { tmpfile, err := ioutil.TempFile("", "pwd") if err != nil { @@ -195,6 +220,56 @@ func TestInstanceCreate(t *testing.T) { } +func TestInstanceCreateWindows(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "pwd") + if err != nil { + log.Fatal(err) + } + tmpfile.Close() + os.Remove(tmpfile.Name()) + defer os.Remove(tmpfile.Name()) + + storage, err := NewFileStorage(tmpfile.Name()) + + assert.Nil(t, err) + + s1 := &types.Session{Id: "session1"} + i1 := &types.WindowsInstance{SessionId: s1.Id, ID: "some id"} + err = storage.SessionPut(s1) + assert.Nil(t, err) + err = storage.InstanceCreateWindows(i1) + assert.Nil(t, err) + + loadedSession, err := storage.SessionGet("session1") + assert.Nil(t, err) + + assert.Equal(t, i1, loadedSession.WindowsAssigned[0]) +} + +func TestInstanceDeleteWindows(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "pwd") + if err != nil { + log.Fatal(err) + } + tmpfile.Close() + os.Remove(tmpfile.Name()) + defer os.Remove(tmpfile.Name()) + + storage, err := NewFileStorage(tmpfile.Name()) + + assert.Nil(t, err) + + s1 := &types.Session{Id: "session1", WindowsAssigned: []*types.WindowsInstance{{ID: "one"}}} + err = storage.SessionPut(s1) + assert.Nil(t, err) + + err = storage.InstanceDeleteWindows(s1.Id, "one") + assert.Nil(t, err) + + found, err := storage.SessionGet(s1.Id) + assert.Equal(t, 0, len(found.WindowsAssigned)) +} + func TestCounts(t *testing.T) { tmpfile, err := ioutil.TempFile("", "pwd") if err != nil { diff --git a/storage/mock.go b/storage/mock.go index 8f75313..f8ef5d8 100644 --- a/storage/mock.go +++ b/storage/mock.go @@ -39,6 +39,11 @@ func (m *Mock) InstanceGet(sessionId, name string) (*types.Instance, error) { return args.Get(0).(*types.Instance), args.Error(1) } +func (m *Mock) InstanceGetAllWindows() ([]*types.WindowsInstance, error) { + args := m.Called() + return args.Get(0).([]*types.WindowsInstance), args.Error(1) +} + func (m *Mock) InstanceFindByIP(sessionId, ip string) (*types.Instance, error) { args := m.Called(sessionId, ip) return args.Get(0).(*types.Instance), args.Error(1) @@ -49,11 +54,21 @@ func (m *Mock) InstanceCreate(sessionId string, instance *types.Instance) error return args.Error(0) } +func (m *Mock) InstanceCreateWindows(instance *types.WindowsInstance) error { + args := m.Called(instance) + return args.Error(0) +} + func (m *Mock) InstanceDelete(sessionId, instanceName string) error { args := m.Called(sessionId, instanceName) return args.Error(0) } +func (m *Mock) InstanceDeleteWindows(sessionId, instanceId string) error { + args := m.Called(sessionId, instanceId) + return args.Error(0) +} + func (m *Mock) InstanceCount() (int, error) { args := m.Called() return args.Int(0), args.Error(1) diff --git a/storage/storage.go b/storage/storage.go index 158f708..c68333f 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -19,5 +19,8 @@ type StorageApi interface { InstanceFindByIP(session, ip string) (*types.Instance, error) InstanceCreate(sessionId string, instance *types.Instance) error InstanceDelete(sessionId, instanceName string) error + InstanceDeleteWindows(sessionId, instanceId string) error InstanceCount() (int, error) + InstanceGetAllWindows() ([]*types.WindowsInstance, error) + InstanceCreateWindows(*types.WindowsInstance) error }