diff --git a/api.go b/api.go index 277ea0c..a55b0bd 100644 --- a/api.go +++ b/api.go @@ -11,7 +11,6 @@ import ( "github.com/miekg/dns" "github.com/play-with-docker/play-with-docker/config" "github.com/play-with-docker/play-with-docker/handlers" - "github.com/play-with-docker/play-with-docker/services" "github.com/play-with-docker/play-with-docker/templates" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/urfave/negroni" @@ -20,6 +19,7 @@ import ( func main() { config.ParseFlags() + handlers.Bootstrap() bypassCaptcha := len(os.Getenv("GOOGLE_RECAPTCHA_DISABLED")) > 0 @@ -40,14 +40,7 @@ func main() { } }() - server := services.CreateWSServer() - server.On("connection", handlers.WS) - server.On("error", handlers.WSError) - - err := services.LoadSessionsFromDisk() - if err != nil && !os.IsNotExist(err) { - log.Fatal("Error decoding sessions from disk ", err) - } + server := handlers.Broadcast.GetHandler() r := mux.NewRouter() corsRouter := mux.NewRouter() diff --git a/config/config.go b/config/config.go index 0981d94..b165620 100644 --- a/config/config.go +++ b/config/config.go @@ -2,7 +2,9 @@ package config import ( "flag" + "os" "regexp" + "time" ) const ( @@ -33,3 +35,27 @@ func ParseFlags() { flag.Float64Var(&MaxLoadAvg, "maxload", 100, "Maximum allowed load average before failing ping requests") flag.Parse() } +func GetDindImageName() string { + dindImage := os.Getenv("DIND_IMAGE") + defaultDindImageName := "franela/dind" + if len(dindImage) == 0 { + dindImage = defaultDindImageName + } + return dindImage +} +func GetDuration(reqDur string) time.Duration { + var defaultDuration = 4 * time.Hour + if reqDur != "" { + if dur, err := time.ParseDuration(reqDur); err == nil && dur <= defaultDuration { + return dur + } + return defaultDuration + } + + envDur := os.Getenv("EXPIRY") + if dur, err := time.ParseDuration(envDur); err == nil { + return dur + } + + return defaultDuration +} diff --git a/docker/docker.go b/docker/docker.go index b8d485c..65a0f06 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -1,5 +1,365 @@ package docker -type Docker interface { +import ( + "archive/tar" + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "os" + "strconv" + "strings" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/client" +) + +const ( + Byte = 1 + Kilobyte = 1024 * Byte + Megabyte = 1024 * Kilobyte +) + +type DockerApi interface { CreateNetwork(id string) error + ConnectNetwork(container, network, ip string) (string, error) + GetDaemonInfo() (types.Info, error) + GetSwarmPorts() ([]string, []uint16, error) + GetPorts() ([]uint16, error) + GetContainerStats(name string) (io.ReadCloser, error) + ContainerResize(name string, rows, cols uint) error + CreateAttachConnection(name string) (net.Conn, error) + CopyToContainer(containerName, destination, fileName string, content io.Reader) error + DeleteContainer(id string) error + CreateContainer(opts CreateContainerOpts) (string, error) + ExecAttach(instanceName string, command []string, out io.Writer) (int, error) + DisconnectNetwork(containerId, networkId string) error + DeleteNetwork(id string) error + Exec(instanceName string, command []string) (int, error) +} + +type docker struct { + c *client.Client +} + +func (d *docker) CreateNetwork(id string) error { + opts := types.NetworkCreate{Driver: "overlay", Attachable: true} + _, err := d.c.NetworkCreate(context.Background(), id, opts) + + if err != nil { + log.Printf("Starting session err [%s]\n", err) + + return err + } + + return nil +} + +func (d *docker) ConnectNetwork(containerId, networkId, ip string) (string, error) { + settings := &network.EndpointSettings{} + if ip != "" { + settings.IPAddress = ip + } + err := d.c.NetworkConnect(context.Background(), networkId, containerId, settings) + + if err != nil && !strings.Contains(err.Error(), "already exists") { + log.Printf("Connection container to network err [%s]\n", err) + + return "", err + } + + // Obtain the IP of the PWD container in this network + container, err := d.c.ContainerInspect(context.Background(), containerId) + if err != nil { + return "", err + } + + n, found := container.NetworkSettings.Networks[networkId] + if !found { + return "", fmt.Errorf("Container [%s] connected to the network [%s] but couldn't obtain it's IP address", containerId, networkId) + } + + return n.IPAddress, nil +} + +func (d *docker) GetDaemonInfo() (types.Info, error) { + return d.c.Info(context.Background()) +} + +func (d *docker) GetSwarmPorts() ([]string, []uint16, error) { + hosts := []string{} + ports := []uint16{} + + nodesIdx := map[string]string{} + nodes, nodesErr := d.c.NodeList(context.Background(), types.NodeListOptions{}) + if nodesErr != nil { + return nil, nil, nodesErr + } + for _, n := range nodes { + nodesIdx[n.ID] = n.Description.Hostname + hosts = append(hosts, n.Description.Hostname) + } + + services, err := d.c.ServiceList(context.Background(), types.ServiceListOptions{}) + if err != nil { + return nil, nil, err + } + for _, service := range services { + for _, p := range service.Endpoint.Ports { + ports = append(ports, uint16(p.PublishedPort)) + } + } + + return hosts, ports, nil +} + +func (d *docker) GetPorts() ([]uint16, error) { + opts := types.ContainerListOptions{} + containers, err := d.c.ContainerList(context.Background(), opts) + if err != nil { + return nil, err + } + + openPorts := []uint16{} + for _, c := range containers { + for _, p := range c.Ports { + // When port is not published on the host docker return public port as 0, so we need to avoid it + if p.PublicPort != 0 { + openPorts = append(openPorts, p.PublicPort) + } + } + } + + return openPorts, nil +} + +func (d *docker) GetContainerStats(name string) (io.ReadCloser, error) { + stats, err := d.c.ContainerStats(context.Background(), name, false) + + return stats.Body, err +} + +func (d *docker) ContainerResize(name string, rows, cols uint) error { + return d.c.ContainerResize(context.Background(), name, types.ResizeOptions{Height: rows, Width: cols}) +} + +func (d *docker) CreateAttachConnection(name string) (net.Conn, error) { + ctx := context.Background() + + conf := types.ContainerAttachOptions{true, true, true, true, "ctrl-^,ctrl-^", true} + conn, err := d.c.ContainerAttach(ctx, name, conf) + if err != nil { + return nil, err + } + + return conn.Conn, nil +} + +func (d *docker) CopyToContainer(containerName, destination, fileName string, content io.Reader) error { + r, w := io.Pipe() + b, readErr := ioutil.ReadAll(content) + if readErr != nil { + return readErr + } + t := tar.NewWriter(w) + go func() { + t.WriteHeader(&tar.Header{Name: fileName, Mode: 0600, Size: int64(len(b))}) + t.Write(b) + t.Close() + w.Close() + }() + return d.c.CopyToContainer(context.Background(), containerName, destination, r, types.CopyToContainerOptions{AllowOverwriteDirWithFile: true}) +} + +func (d *docker) DeleteContainer(id string) error { + return d.c.ContainerRemove(context.Background(), id, types.ContainerRemoveOptions{Force: true, RemoveVolumes: true}) +} + +type CreateContainerOpts struct { + Image string + SessionId string + PwdIpAddress string + ContainerName string + Hostname string + ServerCert []byte + ServerKey []byte + CACert []byte +} + +func (d *docker) CreateContainer(opts CreateContainerOpts) (string, error) { + // Make sure directories are available for the new instance container + containerDir := "/var/run/pwd" + containerCertDir := fmt.Sprintf("%s/certs", containerDir) + + env := []string{} + + // Write certs to container cert dir + if len(opts.ServerCert) > 0 { + env = append(env, `DOCKER_TLSCERT=\/var\/run\/pwd\/certs\/cert.pem`) + } + if len(opts.ServerKey) > 0 { + env = append(env, `DOCKER_TLSKEY=\/var\/run\/pwd\/certs\/key.pem`) + } + if len(opts.CACert) > 0 { + // if ca cert is specified, verify that clients that connects present a certificate signed by the CA + env = append(env, `DOCKER_TLSCACERT=\/var\/run\/pwd\/certs\/ca.pem`) + } + if len(opts.ServerCert) > 0 || len(opts.ServerKey) > 0 || len(opts.CACert) > 0 { + // if any of the certs is specified, enable TLS + env = append(env, "DOCKER_TLSENABLE=true") + } else { + env = append(env, "DOCKER_TLSENABLE=false") + } + + h := &container.HostConfig{ + NetworkMode: container.NetworkMode(opts.SessionId), + Privileged: true, + AutoRemove: true, + LogConfig: container.LogConfig{Config: map[string]string{"max-size": "10m", "max-file": "1"}}, + } + + if os.Getenv("APPARMOR_PROFILE") != "" { + h.SecurityOpt = []string{fmt.Sprintf("apparmor=%s", os.Getenv("APPARMOR_PROFILE"))} + } + + var pidsLimit = int64(1000) + if envLimit := os.Getenv("MAX_PROCESSES"); envLimit != "" { + if i, err := strconv.Atoi(envLimit); err == nil { + pidsLimit = int64(i) + } + } + h.Resources.PidsLimit = pidsLimit + h.Resources.Memory = 4092 * Megabyte + t := true + h.Resources.OomKillDisable = &t + + env = append(env, fmt.Sprintf("PWD_IP_ADDRESS=%s", opts.PwdIpAddress)) + cf := &container.Config{Hostname: opts.Hostname, + Image: opts.Image, + Tty: true, + OpenStdin: true, + AttachStdin: true, + AttachStdout: true, + AttachStderr: true, + Env: env, + } + networkConf := &network.NetworkingConfig{ + map[string]*network.EndpointSettings{ + opts.SessionId: &network.EndpointSettings{Aliases: []string{opts.Hostname}}, + }, + } + container, err := d.c.ContainerCreate(context.Background(), cf, h, networkConf, opts.ContainerName) + + if err != nil { + return "", err + } + + if err := d.copyIfSet(opts.ServerCert, "cert.pem", containerCertDir, opts.ContainerName); err != nil { + return "", err + } + if err := d.copyIfSet(opts.ServerKey, "key.pem", containerCertDir, opts.ContainerName); err != nil { + return "", err + } + if err := d.copyIfSet(opts.CACert, "ca.pem", containerCertDir, opts.ContainerName); err != nil { + return "", err + } + + err = d.c.ContainerStart(context.Background(), container.ID, types.ContainerStartOptions{}) + if err != nil { + return "", err + } + + cinfo, err := d.c.ContainerInspect(context.Background(), container.ID) + if err != nil { + return "", err + } + + return cinfo.NetworkSettings.Networks[opts.SessionId].IPAddress, nil +} + +func (d *docker) copyIfSet(content []byte, fileName, path, containerName string) error { + if len(content) > 0 { + return d.CopyToContainer(containerName, path, fileName, bytes.NewReader(content)) + } + return nil +} + +func (d *docker) ExecAttach(instanceName string, command []string, out io.Writer) (int, error) { + e, err := d.c.ContainerExecCreate(context.Background(), instanceName, types.ExecConfig{Cmd: command, AttachStdout: true, AttachStderr: true, Tty: true}) + if err != nil { + return 0, err + } + resp, err := d.c.ContainerExecAttach(context.Background(), e.ID, types.ExecConfig{AttachStdout: true, AttachStderr: true, Tty: true}) + if err != nil { + return 0, err + } + io.Copy(out, resp.Reader) + var ins types.ContainerExecInspect + for _ = range time.Tick(1 * time.Second) { + ins, err = d.c.ContainerExecInspect(context.Background(), e.ID) + if ins.Running { + continue + } + if err != nil { + return 0, err + } + break + } + return ins.ExitCode, nil + +} + +func (d *docker) Exec(instanceName string, command []string) (int, error) { + e, err := d.c.ContainerExecCreate(context.Background(), instanceName, types.ExecConfig{Cmd: command}) + if err != nil { + return 0, err + } + err = d.c.ContainerExecStart(context.Background(), e.ID, types.ExecStartCheck{}) + if err != nil { + return 0, err + } + var ins types.ContainerExecInspect + for _ = range time.Tick(1 * time.Second) { + ins, err = d.c.ContainerExecInspect(context.Background(), e.ID) + if ins.Running { + continue + } + if err != nil { + return 0, err + } + break + } + return ins.ExitCode, nil +} + +func (d *docker) DisconnectNetwork(containerId, networkId string) error { + err := d.c.NetworkDisconnect(context.Background(), networkId, containerId, true) + + if err != nil { + log.Printf("Disconnection of container from network err [%s]\n", err) + + return err + } + + return nil +} + +func (d *docker) DeleteNetwork(id string) error { + err := d.c.NetworkRemove(context.Background(), id) + + if err != nil { + return err + } + + return nil +} + +func NewDocker(c *client.Client) *docker { + return &docker{c: c} } diff --git a/handlers/bootstrap.go b/handlers/bootstrap.go new file mode 100644 index 0000000..08e8205 --- /dev/null +++ b/handlers/bootstrap.go @@ -0,0 +1,38 @@ +package handlers + +import ( + "log" + "os" + + "github.com/docker/docker/client" + "github.com/play-with-docker/play-with-docker/docker" + "github.com/play-with-docker/play-with-docker/pwd" +) + +var core pwd.PWDApi +var Broadcast pwd.BroadcastApi + +func Bootstrap() { + c, err := client.NewEnvClient() + if err != nil { + log.Fatal(err) + } + + d := docker.NewDocker(c) + + Broadcast, err = pwd.NewBroadcast(WS, WSError) + if err != nil { + log.Fatal(err) + } + + t := pwd.NewScheduler(Broadcast, d) + + s := pwd.NewStorage() + + core = pwd.NewPWD(d, t, Broadcast, s) + + err = core.SessionLoadAndPrepare() + if err != nil && !os.IsNotExist(err) { + log.Fatal("Error decoding sessions from disk ", err) + } +} diff --git a/handlers/delete_instance.go b/handlers/delete_instance.go index 76e0265..7311e07 100644 --- a/handlers/delete_instance.go +++ b/handlers/delete_instance.go @@ -4,7 +4,6 @@ import ( "net/http" "github.com/gorilla/mux" - "github.com/play-with-docker/play-with-docker/services" ) func DeleteInstance(rw http.ResponseWriter, req *http.Request) { @@ -12,11 +11,9 @@ func DeleteInstance(rw http.ResponseWriter, req *http.Request) { sessionId := vars["sessionId"] instanceName := vars["instanceName"] - s := services.GetSession(sessionId) - s.Lock() - defer s.Unlock() - i := services.GetInstance(s, instanceName) - err := services.DeleteInstance(s, i) + s := core.SessionGet(sessionId) + i := core.InstanceGet(s, instanceName) + err := core.InstanceDelete(s, i) if err != nil { rw.WriteHeader(http.StatusInternalServerError) return diff --git a/handlers/dns.go b/handlers/dns.go index 7cdd27e..e199296 100644 --- a/handlers/dns.go +++ b/handlers/dns.go @@ -8,7 +8,6 @@ import ( "github.com/miekg/dns" "github.com/play-with-docker/play-with-docker/config" - "github.com/play-with-docker/play-with-docker/services" ) func DnsRequest(w dns.ResponseWriter, r *dns.Msg) { @@ -37,7 +36,7 @@ func DnsRequest(w dns.ResponseWriter, r *dns.Msg) { match := config.AliasFilter.FindStringSubmatch(question) - i := services.FindInstanceByAlias(match[2], match[1]) + i := core.InstanceFindByAlias(match[2], match[1]) m := new(dns.Msg) m.SetReply(r) diff --git a/handlers/exec.go b/handlers/exec.go index eba4a93..37370ed 100644 --- a/handlers/exec.go +++ b/handlers/exec.go @@ -6,7 +6,6 @@ import ( "net/http" "github.com/gorilla/mux" - "github.com/play-with-docker/play-with-docker/services" ) type execRequest struct { @@ -19,6 +18,7 @@ type execResponse struct { func Exec(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) + sessionId := vars["sessionId"] instanceName := vars["instanceName"] var er execRequest @@ -28,7 +28,18 @@ func Exec(rw http.ResponseWriter, req *http.Request) { return } - code, err := services.Exec(instanceName, er.Cmd) + s := core.SessionGet(sessionId) + if s == nil { + rw.WriteHeader(http.StatusNotFound) + return + } + i := core.InstanceGet(s, instanceName) + if i == nil { + rw.WriteHeader(http.StatusNotFound) + return + } + + code, err := core.InstanceExec(i, er.Cmd) if err != nil { log.Println(err) diff --git a/handlers/file_upload.go b/handlers/file_upload.go index cd8099d..4241b62 100644 --- a/handlers/file_upload.go +++ b/handlers/file_upload.go @@ -5,7 +5,6 @@ import ( "net/http" "github.com/gorilla/mux" - "github.com/play-with-docker/play-with-docker/services" ) func FileUpload(rw http.ResponseWriter, req *http.Request) { @@ -13,14 +12,14 @@ func FileUpload(rw http.ResponseWriter, req *http.Request) { sessionId := vars["sessionId"] instanceName := vars["instanceName"] - s := services.GetSession(sessionId) - i := services.GetInstance(s, instanceName) + s := core.SessionGet(sessionId) + i := core.InstanceGet(s, instanceName) // allow up to 32 MB which is the default // has a url query parameter, ignore body if url := req.URL.Query().Get("url"); url != "" { - err := i.UploadFromURL(req.URL.Query().Get("url")) + err := core.InstanceUploadFromUrl(i, req.URL.Query().Get("url")) if err != nil { log.Println(err) rw.WriteHeader(http.StatusInternalServerError) diff --git a/handlers/get_instance_images.go b/handlers/get_instance_images.go index f102044..79de76e 100644 --- a/handlers/get_instance_images.go +++ b/handlers/get_instance_images.go @@ -3,11 +3,9 @@ package handlers import ( "encoding/json" "net/http" - - "github.com/play-with-docker/play-with-docker/services" ) func GetInstanceImages(rw http.ResponseWriter, req *http.Request) { - instanceImages := services.InstanceImages() + instanceImages := core.InstanceAllowedImages() json.NewEncoder(rw).Encode(instanceImages) } diff --git a/handlers/get_session.go b/handlers/get_session.go index 741d75b..55aba26 100644 --- a/handlers/get_session.go +++ b/handlers/get_session.go @@ -5,14 +5,13 @@ import ( "net/http" "github.com/gorilla/mux" - "github.com/play-with-docker/play-with-docker/services" ) func GetSession(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) sessionId := vars["sessionId"] - session := services.GetSession(sessionId) + session := core.SessionGet(sessionId) if session == nil { rw.WriteHeader(http.StatusNotFound) diff --git a/handlers/home.go b/handlers/home.go index 6731098..fbe1bdd 100644 --- a/handlers/home.go +++ b/handlers/home.go @@ -4,16 +4,15 @@ import ( "net/http" "github.com/gorilla/mux" - "github.com/play-with-docker/play-with-docker/services" ) func Home(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) sessionId := vars["sessionId"] - s := services.GetSession(sessionId) + s := core.SessionGet(sessionId) if s.Stack != "" { - go s.DeployStack() + go core.SessionDeployStack(s) } http.ServeFile(w, r, "./www/index.html") } diff --git a/handlers/new_instance.go b/handlers/new_instance.go index b9d959e..08ad7ff 100644 --- a/handlers/new_instance.go +++ b/handlers/new_instance.go @@ -6,27 +6,25 @@ import ( "net/http" "github.com/gorilla/mux" - "github.com/play-with-docker/play-with-docker/services" + "github.com/play-with-docker/play-with-docker/pwd" ) func NewInstance(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) sessionId := vars["sessionId"] - body := services.InstanceConfig{} + body := pwd.InstanceConfig{} json.NewDecoder(req.Body).Decode(&body) - s := services.GetSession(sessionId) + s := core.SessionGet(sessionId) - s.Lock() - defer s.Unlock() if len(s.Instances) >= 5 { rw.WriteHeader(http.StatusConflict) return } - i, err := services.NewInstance(s, body) + i, err := core.InstanceNew(s, body) if err != nil { log.Println(err) rw.WriteHeader(http.StatusInternalServerError) diff --git a/handlers/new_session.go b/handlers/new_session.go index fc344de..609ac7f 100644 --- a/handlers/new_session.go +++ b/handlers/new_session.go @@ -7,7 +7,7 @@ import ( "net/http" "github.com/play-with-docker/play-with-docker/config" - "github.com/play-with-docker/play-with-docker/services" + "github.com/play-with-docker/play-with-docker/recaptcha" ) type NewSessionResponse struct { @@ -17,7 +17,7 @@ type NewSessionResponse struct { func NewSession(rw http.ResponseWriter, req *http.Request) { req.ParseForm() - if !services.IsHuman(req, rw) { + if !recaptcha.IsHuman(req, rw) { // User it not a human rw.WriteHeader(http.StatusForbidden) return @@ -38,8 +38,8 @@ func NewSession(rw http.ResponseWriter, req *http.Request) { } } - duration := services.GetDuration(reqDur) - s, err := services.NewSession(duration, stack) + duration := config.GetDuration(reqDur) + s, err := core.SessionNew(duration, stack, "") if err != nil { log.Println(err) //TODO: Return some error code diff --git a/handlers/reverseproxy.go b/handlers/reverseproxy.go index 4a2fbc8..98ad132 100644 --- a/handlers/reverseproxy.go +++ b/handlers/reverseproxy.go @@ -11,7 +11,6 @@ import ( "github.com/gorilla/mux" "github.com/play-with-docker/play-with-docker/config" - "github.com/play-with-docker/play-with-docker/services" ) func getTargetInfo(vars map[string]string, req *http.Request) (string, string) { @@ -29,7 +28,7 @@ func getTargetInfo(vars map[string]string, req *http.Request) (string, string) { } if alias != "" { - instance := services.FindInstanceByAlias(sessionPrefix, alias) + instance := core.InstanceFindByAlias(sessionPrefix, alias) if instance != nil { node = instance.IP return node, port diff --git a/handlers/tlsproxy.go b/handlers/tlsproxy.go index 34a7919..4a1ed03 100644 --- a/handlers/tlsproxy.go +++ b/handlers/tlsproxy.go @@ -9,7 +9,6 @@ import ( vhost "github.com/inconshreveable/go-vhost" "github.com/play-with-docker/play-with-docker/config" - "github.com/play-with-docker/play-with-docker/services" ) func StartTLSProxy(port string) { @@ -51,7 +50,7 @@ func StartTLSProxy(port string) { } else { alias := match[1] sessionPrefix := match[2] - instance := services.FindInstanceByAlias(sessionPrefix, alias) + instance := core.InstanceFindByAlias(sessionPrefix, alias) if instance != nil { targetIP = instance.IP } else { diff --git a/handlers/ws.go b/handlers/ws.go index b51b4b6..37e8b8f 100644 --- a/handlers/ws.go +++ b/handlers/ws.go @@ -6,7 +6,6 @@ import ( "github.com/googollee/go-socket.io" "github.com/gorilla/mux" - "github.com/play-with-docker/play-with-docker/services" ) func WS(so socketio.Socket) { @@ -19,74 +18,36 @@ func WS(so socketio.Socket) { sessionId := vars["sessionId"] - session := services.GetSession(sessionId) + session := core.SessionGet(sessionId) if session == nil { log.Printf("Session with id [%s] does not exist!\n", sessionId) return } - session.AddNewClient(services.NewClient(so, session)) -} -func WSError(so socketio.Socket) { - log.Println("error ws") -} + so.Join(session.Id) -/* - so.Join(sessionId) + client := core.ClientNew(so.Id(), session) - // TODO: Reset terminal geometry + so.On("session close", func() { + core.SessionClose(session) + }) - so.On("resize", func(cols, rows int) { - // TODO: Reset terminal geometry + so.On("terminal in", func(name, data string) { + // User wrote something on the terminal. Need to write it to the instance terminal + instance := core.InstanceGet(session, name) + core.InstanceWriteToTerminal(instance, data) + }) + + so.On("viewport resize", func(cols, rows uint) { + // User resized his viewport + core.ClientResizeViewPort(client, cols, rows) }) so.On("disconnection", func() { - //TODO: reset the best terminal geometry + core.ClientClose(client) }) - - ctx := context.Background() - - session := services.GetSession(sessionId) - instance := services.GetInstance(session, instanceName) - - if instance.Stdout == nil { - id, err := services.CreateExecConnection(instance.Name, ctx) - if err != nil { - return - } - conn, err := services.AttachExecConnection(id, ctx) - if err != nil { - return - } - - encoder := encoding.Replacement.NewEncoder() - instance.Conn = conn - instance.Stdout = &cookoo.MultiWriter{} - instance.Stdout.Init() - u1 := uuid.NewV4() - instance.Stdout.AddWriter(u1.String(), ws) - go func() { - io.Copy(encoder.Writer(instance.Stdout), instance.Conn.Reader) - instance.Stdout.RemoveWriter(u1.String()) - }() - go func() { - io.Copy(instance.Conn.Conn, ws) - instance.Stdout.RemoveWriter(u1.String()) - }() - select { - case <-ctx.Done(): - } - } else { - u1 := uuid.NewV4() - instance.Stdout.AddWriter(u1.String(), ws) - - go func() { - io.Copy(instance.Conn.Conn, ws) - instance.Stdout.RemoveWriter(u1.String()) - }() - select { - case <-ctx.Done(): - } - } } -*/ + +func WSError(so socketio.Socket) { + log.Println("error ws") +} diff --git a/pwd/broadcast.go b/pwd/broadcast.go new file mode 100644 index 0000000..1f54d08 --- /dev/null +++ b/pwd/broadcast.go @@ -0,0 +1,34 @@ +package pwd + +import ( + "net/http" + + "github.com/googollee/go-socket.io" +) + +type BroadcastApi interface { + BroadcastTo(sessionId, eventName string, args ...interface{}) + GetHandler() http.Handler +} + +type broadcast struct { + sio *socketio.Server +} + +func (b *broadcast) BroadcastTo(sessionId, eventName string, args ...interface{}) { + b.sio.BroadcastTo(sessionId, eventName, args...) +} + +func (b *broadcast) GetHandler() http.Handler { + return b.sio +} + +func NewBroadcast(connectionEvent, errorEvent interface{}) (*broadcast, error) { + server, err := socketio.NewServer(nil) + if err != nil { + return nil, err + } + server.On("connection", connectionEvent) + server.On("error", errorEvent) + return &broadcast{sio: server}, nil +} diff --git a/pwd/broadcast_mock_test.go b/pwd/broadcast_mock_test.go new file mode 100644 index 0000000..d9735fc --- /dev/null +++ b/pwd/broadcast_mock_test.go @@ -0,0 +1,12 @@ +package pwd + +import "net/http" + +type mockBroadcast struct { +} + +func (m *mockBroadcast) BroadcastTo(sessionId, eventName string, args ...interface{}) { +} +func (m *mockBroadcast) GetHandler() http.Handler { + return nil +} diff --git a/services/check_swarm_status_task.go b/pwd/check_swarm_status_task.go similarity index 86% rename from services/check_swarm_status_task.go rename to pwd/check_swarm_status_task.go index b665724..297a36b 100644 --- a/services/check_swarm_status_task.go +++ b/pwd/check_swarm_status_task.go @@ -1,4 +1,4 @@ -package services +package pwd import ( "log" @@ -10,7 +10,7 @@ type checkSwarmStatusTask struct { } func (c checkSwarmStatusTask) Run(i *Instance) error { - if info, err := GetDaemonInfo(i); err == nil { + if info, err := i.docker.GetDaemonInfo(); err == nil { if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked { i.IsManager = &info.Swarm.ControlAvailable } else { diff --git a/pwd/check_swarm_used_ports.go b/pwd/check_swarm_used_ports.go new file mode 100644 index 0000000..c01025b --- /dev/null +++ b/pwd/check_swarm_used_ports.go @@ -0,0 +1,30 @@ +package pwd + +import ( + "fmt" + "log" +) + +type checkSwarmUsedPortsTask struct { +} + +func (c checkSwarmUsedPortsTask) Run(i *Instance) error { + if i.IsManager != nil && *i.IsManager { + sessionPrefix := i.session.Id[:8] + // This is a swarm manager instance, then check for ports + if hosts, ports, err := i.docker.GetSwarmPorts(); err != nil { + log.Println(err) + return err + } else { + for _, host := range hosts { + host = fmt.Sprintf("%s_%s", sessionPrefix, host) + for _, port := range ports { + if i.session.Instances[host] != nil { + i.session.Instances[host].setUsedPort(port) + } + } + } + } + } + return nil +} diff --git a/services/check_used_ports_task.go b/pwd/check_used_ports_task.go similarity index 77% rename from services/check_used_ports_task.go rename to pwd/check_used_ports_task.go index e9d5b1c..deae46a 100644 --- a/services/check_used_ports_task.go +++ b/pwd/check_used_ports_task.go @@ -1,4 +1,4 @@ -package services +package pwd import "log" @@ -6,7 +6,7 @@ type checkUsedPortsTask struct { } func (c checkUsedPortsTask) Run(i *Instance) error { - if ports, err := GetUsedPorts(i); err == nil { + if ports, err := i.docker.GetPorts(); err == nil { for _, p := range ports { i.setUsedPort(uint16(p)) } diff --git a/pwd/client.go b/pwd/client.go new file mode 100644 index 0000000..79fec01 --- /dev/null +++ b/pwd/client.go @@ -0,0 +1,54 @@ +package pwd + +import "log" + +type Client struct { + Id string + viewPort ViewPort + session *Session +} + +type ViewPort struct { + Rows uint + Cols uint +} + +func (p *pwd) ClientNew(id string, session *Session) *Client { + c := &Client{Id: id, session: session} + session.clients = append(session.clients, c) + return c +} + +func (p *pwd) ClientResizeViewPort(c *Client, cols, rows uint) { + c.viewPort.Rows = rows + c.viewPort.Cols = cols + + p.notifyClientSmallestViewPort(c.session) +} + +func (p *pwd) ClientClose(client *Client) { + // Client has disconnected. Remove from session and recheck terminal sizes. + session := client.session + for i, cl := range session.clients { + if cl.Id == client.Id { + session.clients = append(session.clients[:i], session.clients[i+1:]...) + break + } + } + if len(session.clients) > 0 { + p.notifyClientSmallestViewPort(session) + } + setGauges() +} + +func (p *pwd) notifyClientSmallestViewPort(session *Session) { + vp := p.SessionGetSmallestViewPort(session) + // Resize all terminals in the session + p.broadcast.BroadcastTo(session.Id, "viewport resize", vp.Cols, vp.Rows) + for _, instance := range session.Instances { + err := p.InstanceResizeTerminal(instance, vp.Rows, vp.Cols) + if err != nil { + log.Println("Error resizing terminal", err) + } + } +} diff --git a/services/collect_stats_task.go b/pwd/collect_stats_task.go similarity index 92% rename from services/collect_stats_task.go rename to pwd/collect_stats_task.go index a2ec31d..200c35c 100644 --- a/services/collect_stats_task.go +++ b/pwd/collect_stats_task.go @@ -1,4 +1,4 @@ -package services +package pwd import ( "encoding/json" @@ -7,6 +7,7 @@ import ( "github.com/docker/docker/api/types" units "github.com/docker/go-units" + "github.com/play-with-docker/play-with-docker/docker" ) type collectStatsTask struct { @@ -17,10 +18,12 @@ type collectStatsTask struct { cpuPercent float64 previousCPU uint64 previousSystem uint64 + + docker docker.DockerApi } func (c collectStatsTask) Run(i *Instance) error { - reader, err := GetContainerStats(i.Name) + reader, err := c.docker.GetContainerStats(i.Name) if err != nil { log.Println("Error while trying to collect instance stats", err) return err diff --git a/pwd/docker_mock_test.go b/pwd/docker_mock_test.go index 325af4e..b0720d4 100644 --- a/pwd/docker_mock_test.go +++ b/pwd/docker_mock_test.go @@ -1,9 +1,68 @@ package pwd +import ( + "io" + "net" + + "github.com/docker/docker/api/types" + "github.com/play-with-docker/play-with-docker/docker" +) + type mockDocker struct { - createNetwork func(string) error + createNetwork func(string) error + connectNetwork func(container, network, ip string) (string, error) } func (m *mockDocker) CreateNetwork(id string) error { + if m.createNetwork == nil { + return nil + } return m.createNetwork(id) } +func (m *mockDocker) ConnectNetwork(container, network, ip string) (string, error) { + if m.connectNetwork == nil { + return "10.0.0.1", nil + } + return m.connectNetwork(container, network, ip) +} + +func (m *mockDocker) GetDaemonInfo() (types.Info, error) { + return types.Info{}, nil +} + +func (m *mockDocker) GetSwarmPorts() ([]string, []uint16, error) { + return []string{}, []uint16{}, nil +} +func (m *mockDocker) GetPorts() ([]uint16, error) { + return []uint16{}, nil +} +func (m *mockDocker) GetContainerStats(name string) (io.ReadCloser, error) { + return nil, nil +} +func (m *mockDocker) ContainerResize(name string, rows, cols uint) error { + return nil +} +func (m *mockDocker) CreateAttachConnection(name string) (net.Conn, error) { + return nil, nil +} +func (m *mockDocker) CopyToContainer(containerName, destination, fileName string, content io.Reader) error { + return nil +} +func (m *mockDocker) DeleteContainer(id string) error { + return nil +} +func (m *mockDocker) CreateContainer(opts docker.CreateContainerOpts) (string, error) { + return "", nil +} +func (m *mockDocker) ExecAttach(instanceName string, command []string, out io.Writer) (int, error) { + return 0, nil +} +func (m *mockDocker) DisconnectNetwork(containerId, networkId string) error { + return nil +} +func (m *mockDocker) DeleteNetwork(id string) error { + return nil +} +func (m *mockDocker) Exec(instanceName string, command []string) (int, error) { + return 0, nil +} diff --git a/pwd/instance.go b/pwd/instance.go new file mode 100644 index 0000000..fcf41ba --- /dev/null +++ b/pwd/instance.go @@ -0,0 +1,266 @@ +package pwd + +import ( + "context" + "fmt" + "io" + "log" + "net" + "net/http" + "path/filepath" + "strings" + "sync" + + "github.com/play-with-docker/play-with-docker/config" + "github.com/play-with-docker/play-with-docker/docker" + + "golang.org/x/text/encoding" +) + +type sessionWriter struct { + sessionId string + instanceName string + broadcast BroadcastApi +} + +func (s *sessionWriter) Write(p []byte) (n int, err error) { + s.broadcast.BroadcastTo(s.sessionId, "terminal out", s.instanceName, string(p)) + return len(p), nil +} + +type UInt16Slice []uint16 + +func (p UInt16Slice) Len() int { return len(p) } +func (p UInt16Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p UInt16Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +type Instance struct { + rw sync.Mutex + session *Session `json:"-"` + Name string `json:"name"` + Hostname string `json:"hostname"` + IP string `json:"ip"` + conn net.Conn `json:"-"` + ctx context.Context `json:"-"` + docker docker.DockerApi `json:"-"` + IsManager *bool `json:"is_manager"` + Mem string `json:"mem"` + Cpu string `json:"cpu"` + Alias string `json:"alias"` + tempPorts []uint16 `json:"-"` + ServerCert []byte `json:"server_cert"` + ServerKey []byte `json:"server_key"` + CACert []byte `json:"ca_cert"` + Cert []byte `json:"cert"` + Key []byte `json:"key"` + Ports UInt16Slice +} +type InstanceConfig struct { + ImageName string + Alias string + ServerCert []byte + ServerKey []byte + CACert []byte + Cert []byte + Key []byte +} + +func (i *Instance) setUsedPort(port uint16) { + i.rw.Lock() + defer i.rw.Unlock() + + for _, p := range i.tempPorts { + if p == port { + return + } + } + i.tempPorts = append(i.tempPorts, port) +} +func (i *Instance) IsConnected() bool { + return i.conn != nil + +} + +func (i *Instance) SetSession(s *Session) { + i.session = s +} + +func (p *pwd) InstanceResizeTerminal(instance *Instance, rows, cols uint) error { + return p.docker.ContainerResize(instance.Name, rows, cols) +} + +func (p *pwd) InstanceAttachTerminal(instance *Instance) error { + conn, err := p.docker.CreateAttachConnection(instance.Name) + + if err != nil { + return err + } + + encoder := encoding.Replacement.NewEncoder() + sw := &sessionWriter{sessionId: instance.session.Id, instanceName: instance.Name, broadcast: p.broadcast} + instance.conn = conn + io.Copy(encoder.Writer(sw), conn) + + return nil +} + +func (p *pwd) InstanceUploadFromUrl(instance *Instance, url string) error { + log.Printf("Downloading file [%s]\n", url) + resp, err := http.Get(url) + if err != nil { + return fmt.Errorf("Could not download file [%s]. Error: %s\n", url, err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("Could not download file [%s]. Status code: %d\n", url, resp.StatusCode) + } + + _, fileName := filepath.Split(url) + + copyErr := p.docker.CopyToContainer(instance.Name, "/var/run/pwd/uploads", fileName, resp.Body) + + if copyErr != nil { + return fmt.Errorf("Error while downloading file [%s]. Error: %s\n", url, copyErr) + } + + return nil +} + +func (p *pwd) InstanceGet(session *Session, name string) *Instance { + return session.Instances[name] +} + +func (p *pwd) InstanceFindByIP(ip string) *Instance { + for _, s := range sessions { + for _, i := range s.Instances { + if i.IP == ip { + return i + } + } + } + return nil +} + +func (p *pwd) InstanceFindByAlias(sessionPrefix, alias string) *Instance { + for id, s := range sessions { + if strings.HasPrefix(id, sessionPrefix) { + for _, i := range s.Instances { + if i.Alias == alias { + return i + } + } + } + } + return nil +} + +func (p *pwd) InstanceDelete(session *Session, instance *Instance) error { + if instance.conn != nil { + instance.conn.Close() + } + err := p.docker.DeleteContainer(instance.Name) + if err != nil && !strings.Contains(err.Error(), "No such container") { + log.Println(err) + return err + } + + p.broadcast.BroadcastTo(session.Id, "delete instance", instance.Name) + + delete(session.Instances, instance.Name) + if err := p.storage.Save(); err != nil { + return err + } + + setGauges() + + return nil +} + +func (p *pwd) InstanceNew(session *Session, conf InstanceConfig) (*Instance, error) { + if conf.ImageName == "" { + conf.ImageName = config.GetDindImageName() + } + log.Printf("NewInstance - using image: [%s]\n", conf.ImageName) + + var nodeName string + var containerName string + for i := 1; ; i++ { + nodeName = fmt.Sprintf("node%d", i) + containerName = fmt.Sprintf("%s_%s", session.Id[:8], nodeName) + exists := false + for _, instance := range session.Instances { + if instance.Name == containerName { + exists = true + break + } + } + if !exists { + break + } + } + + opts := docker.CreateContainerOpts{ + Image: config.GetDindImageName(), + SessionId: session.Id, + PwdIpAddress: session.PwdIpAddress, + ContainerName: containerName, + Hostname: nodeName, + ServerCert: conf.ServerCert, + ServerKey: conf.ServerKey, + CACert: conf.CACert, + } + + ip, err := p.docker.CreateContainer(opts) + if err != nil { + return nil, err + } + + instance := &Instance{} + instance.IP = ip + instance.Name = containerName + instance.Hostname = nodeName + instance.Alias = conf.Alias + instance.Cert = conf.Cert + instance.Key = conf.Key + instance.ServerCert = conf.ServerCert + instance.ServerKey = conf.ServerKey + instance.CACert = conf.CACert + instance.session = session + + if session.Instances == nil { + session.Instances = make(map[string]*Instance) + } + session.Instances[instance.Name] = instance + + go p.InstanceAttachTerminal(instance) + + err = p.storage.Save() + if err != nil { + return nil, err + } + + p.broadcast.BroadcastTo(session.Id, "new instance", instance.Name, instance.IP, instance.Hostname) + + setGauges() + + return instance, nil +} + +func (p *pwd) InstanceWriteToTerminal(instance *Instance, data string) { + if instance != nil && instance.conn != nil && len(data) > 0 { + instance.conn.Write([]byte(data)) + } +} + +func (p *pwd) InstanceAllowedImages() []string { + + return []string{ + config.GetDindImageName(), + "franela/dind:overlay2-dev", + } + +} + +func (p *pwd) InstanceExec(instance *Instance, cmd []string) (int, error) { + return p.docker.Exec(instance.Name, cmd) +} diff --git a/pwd/pwd.go b/pwd/pwd.go index 9338b53..04412a8 100644 --- a/pwd/pwd.go +++ b/pwd/pwd.go @@ -1,41 +1,83 @@ package pwd import ( - "sync" "time" "github.com/play-with-docker/play-with-docker/docker" + "github.com/prometheus/client_golang/prometheus" ) -type Session struct { - rw sync.Mutex - Id string `json:"id"` - Instances map[string]*Instance `json:"instances"` - clients []*Client `json:"-"` - CreatedAt time.Time `json:"created_at"` - ExpiresAt time.Time `json:"expires_at"` - scheduled bool `json:"-"` - ticker *time.Ticker `json:"-"` - PwdIpAddress string `json:"pwd_ip_address"` - Ready bool `json:"ready"` - Stack string `json:"stack"` - closingTimer *time.Timer `json:"-"` -} +var ( + sessionsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "sessions", + Help: "Sessions", + }) + clientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "clients", + Help: "Clients", + }) + instancesGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "instances", + Help: "Instances", + }) +) -type Instance struct { -} +var sessions map[string]*Session -type Client struct { +func init() { + prometheus.MustRegister(sessionsGauge) + prometheus.MustRegister(clientsGauge) + prometheus.MustRegister(instancesGauge) + + sessions = make(map[string]*Session) } type pwd struct { - docker docker.Docker `json:"-"` + docker docker.DockerApi + tasks SchedulerApi + broadcast BroadcastApi + storage StorageApi } type PWDApi interface { - NewSession(duration time.Duration, stack string) (*Session, error) + SessionNew(duration time.Duration, stack string, stackName string) (*Session, error) + SessionClose(session *Session) error + SessionGetSmallestViewPort(session *Session) ViewPort + SessionDeployStack(session *Session) error + SessionGet(id string) *Session + SessionLoadAndPrepare() error + + InstanceNew(session *Session, conf InstanceConfig) (*Instance, error) + InstanceResizeTerminal(instance *Instance, cols, rows uint) error + InstanceAttachTerminal(instance *Instance) error + InstanceUploadFromUrl(instance *Instance, url string) error + InstanceGet(session *Session, name string) *Instance + InstanceFindByIP(ip string) *Instance + InstanceFindByAlias(sessionPrefix, alias string) *Instance + InstanceDelete(session *Session, instance *Instance) error + InstanceWriteToTerminal(instance *Instance, data string) + InstanceAllowedImages() []string + InstanceExec(instance *Instance, cmd []string) (int, error) + + ClientNew(id string, session *Session) *Client + ClientResizeViewPort(client *Client, cols, rows uint) + ClientClose(client *Client) } -func NewPWD(d docker.Docker) pwd { - return pwd{docker: d} +func NewPWD(d docker.DockerApi, t SchedulerApi, b BroadcastApi, s StorageApi) *pwd { + return &pwd{docker: d, tasks: t, broadcast: b, storage: s} +} + +func setGauges() { + var ins float64 + var cli float64 + + for _, s := range sessions { + ins += float64(len(s.Instances)) + cli += float64(len(s.clients)) + } + + clientsGauge.Set(cli) + instancesGauge.Set(ins) + sessionsGauge.Set(float64(len(sessions))) } diff --git a/pwd/session.go b/pwd/session.go index b64cb9d..47a94c3 100644 --- a/pwd/session.go +++ b/pwd/session.go @@ -1,25 +1,58 @@ package pwd import ( + "fmt" "log" + "math" + "path" + "strings" + "sync" "time" - "github.com/franela/play-with-docker.old/config" + "github.com/play-with-docker/play-with-docker/config" "github.com/twinj/uuid" ) -func (p *pwd) NewSession(duration time.Duration, stack, stackName string) (*Session, error) { +type sessionBuilderWriter struct { + sessionId string + broadcast BroadcastApi +} + +func (s *sessionBuilderWriter) Write(p []byte) (n int, err error) { + s.broadcast.BroadcastTo(s.sessionId, "session builder out", string(p)) + return len(p), nil +} + +type Session struct { + rw sync.Mutex + Id string `json:"id"` + Instances map[string]*Instance `json:"instances"` + CreatedAt time.Time `json:"created_at"` + ExpiresAt time.Time `json:"expires_at"` + PwdIpAddress string `json:"pwd_ip_address"` + Ready bool `json:"ready"` + Stack string `json:"stack"` + StackName string `json:"stack_name"` + closingTimer *time.Timer `json:"-"` + scheduled bool `json:"-"` + clients []*Client `json:"-"` + ticker *time.Ticker `json:"-"` +} + +func (p *pwd) SessionNew(duration time.Duration, stack, stackName string) (*Session, error) { s := &Session{} s.Id = uuid.NewV4().String() s.Instances = map[string]*Instance{} s.CreatedAt = time.Now() s.ExpiresAt = s.CreatedAt.Add(duration) - /* - if stack == "" { - s.Ready = true - } - s.Stack = stack - */ + s.Ready = true + s.Stack = stack + s.StackName = stackName + + if s.Stack != "" { + s.Ready = false + } + log.Printf("NewSession id=[%s]\n", s.Id) if err := p.docker.CreateNetwork(s.Id); err != nil { @@ -28,39 +61,192 @@ func (p *pwd) NewSession(duration time.Duration, stack, stackName string) (*Sess } log.Printf("Network [%s] created for session [%s]\n", s.Id, s.Id) - s.Prepare() + if err := p.prepareSession(s); err != nil { + log.Println(err) + return nil, err + } + + sessions[s.Id] = s + if err := p.storage.Save(); err != nil { + log.Println(err) + return nil, err + } + + setGauges() return s, nil } +func (p *pwd) SessionClose(s *Session) error { + s.rw.Lock() + defer s.rw.Unlock() + + if s.ticker != nil { + s.ticker.Stop() + } + p.broadcast.BroadcastTo(s.Id, "session end") + p.broadcast.BroadcastTo(s.Id, "disconnect") + log.Printf("Starting clean up of session [%s]\n", s.Id) + for _, i := range s.Instances { + err := p.InstanceDelete(s, i) + if err != nil { + log.Println(err) + return err + } + } + // Disconnect PWD daemon from the network + if err := p.docker.DisconnectNetwork(config.PWDContainerName, s.Id); err != nil { + if !strings.Contains(err.Error(), "is not connected to the network") { + log.Println("ERROR NETWORKING") + return err + } + } + log.Printf("Disconnected pwd from network [%s]\n", s.Id) + if err := p.docker.DeleteNetwork(s.Id); err != nil { + if !strings.Contains(err.Error(), "not found") { + log.Println(err) + return err + } + } + delete(sessions, s.Id) + + // We store sessions as soon as we delete one + if err := p.storage.Save(); err != nil { + return err + } + setGauges() + log.Printf("Cleaned up session [%s]\n", s.Id) + return nil + +} + +func (p *pwd) SessionGetSmallestViewPort(s *Session) ViewPort { + minRows := s.clients[0].viewPort.Rows + minCols := s.clients[0].viewPort.Cols + + for _, c := range s.clients { + minRows = uint(math.Min(float64(minRows), float64(c.viewPort.Rows))) + minCols = uint(math.Min(float64(minCols), float64(c.viewPort.Cols))) + } + + return ViewPort{Rows: minRows, Cols: minCols} +} + +func (p *pwd) SessionDeployStack(s *Session) error { + s.rw.Lock() + defer s.rw.Unlock() + + if s.Ready { + // a stack was already deployed on this session, just ignore + return nil + } + + s.Ready = false + p.broadcast.BroadcastTo(s.Id, "session ready", s.Ready) + + i, err := p.InstanceNew(s, InstanceConfig{}) + if err != nil { + log.Printf("Error creating instance for stack [%s]: %s\n", s.Stack, err) + return err + } + err = p.InstanceUploadFromUrl(i, "https://raw.githubusercontent.com/play-with-docker/stacks/master"+s.Stack) + if err != nil { + log.Printf("Error uploading stack file [%s]: %s\n", s.Stack, err) + return err + } + + w := sessionBuilderWriter{sessionId: s.Id, broadcast: p.broadcast} + fileName := path.Base(s.Stack) + code, err := p.docker.ExecAttach(i.Name, []string{"docker-compose", "-f", "/var/run/pwd/uploads/" + fileName, "up", "-d"}, &w) + if err != nil { + log.Printf("Error executing stack [%s]: %s\n", s.Stack, err) + return err + } + + log.Printf("Stack execution finished with code %d\n", code) + + s.Ready = true + p.broadcast.BroadcastTo(s.Id, "session ready", s.Ready) + + if err := p.storage.Save(); err != nil { + return err + } + return nil +} + +func (p *pwd) SessionGet(sessionId string) *Session { + s := sessions[sessionId] + /* + if s != nil { + for _, instance := range s.Instances { + if !instance.IsConnected() { + instance.SetSession(s) + go instance.Attach() + } + } + + }*/ + return s +} + +func (p *pwd) SessionLoadAndPrepare() error { + err := p.storage.Load() + if err != nil { + return err + } + + for _, s := range sessions { + err := p.prepareSession(s) + if err != nil { + return err + } + for _, i := range s.Instances { + // wire the session back to the instance + i.session = s + go p.InstanceAttachTerminal(i) + } + // Connect PWD daemon to the new network + if s.PwdIpAddress == "" { + return fmt.Errorf("Cannot load stored sessions as they don't have the pwd ip address stored with them") + } + } + + setGauges() + + return nil +} + // This function should be called any time a session needs to be prepared: // 1. Like when it is created // 2. When it was loaded from storage -func (s *Session) Prepare() error { - s.scheduleSessionClose() +func (p *pwd) prepareSession(session *Session) error { + p.scheduleSessionClose(session) // Connect PWD daemon to the new network - s.connectToNetwork() + if err := p.connectToNetwork(session); err != nil { + return nil + } + + // Schedule periodic tasks + p.tasks.Schedule(session) return nil } -func (s *Session) scheduleSessionClose() { +func (p *pwd) scheduleSessionClose(s *Session) { timeLeft := s.ExpiresAt.Sub(time.Now()) s.closingTimer = time.AfterFunc(timeLeft, func() { - s.Close() + p.SessionClose(s) }) } -func (s *Session) Close() { -} - -func (s *Session) connectToNetwork() { - ip, err := ConnectNetwork(config.PWDContainerName, s.Id, "") +func (p *pwd) connectToNetwork(s *Session) error { + ip, err := p.docker.ConnectNetwork(config.PWDContainerName, s.Id, s.PwdIpAddress) if err != nil { log.Println("ERROR NETWORKING") - return nil, err + return err } s.PwdIpAddress = ip log.Printf("Connected %s to network [%s]\n", config.PWDContainerName, s.Id) + return nil } diff --git a/pwd/session_test.go b/pwd/session_test.go index ed5aea3..825d9b0 100644 --- a/pwd/session_test.go +++ b/pwd/session_test.go @@ -4,22 +4,41 @@ import ( "testing" "time" + "github.com/play-with-docker/play-with-docker/config" "github.com/stretchr/testify/assert" ) -func TestNewSession_WithoutStack(t *testing.T) { +func TestSessionNew(t *testing.T) { + config.PWDContainerName = "pwd" + var connectContainerName, connectNetworkName, connectIP string createdNetworkId := "" - mock := &mockDocker{} - mock.createNetwork = func(id string) error { + docker := &mockDocker{} + docker.createNetwork = func(id string) error { createdNetworkId = id return nil } + docker.connectNetwork = func(containerName, networkName, ip string) (string, error) { + connectContainerName = containerName + connectNetworkName = networkName + connectIP = ip + return "10.0.0.1", nil + } - p := NewPWD(mock) + var scheduledSession *Session + tasks := &mockTasks{} + tasks.schedule = func(s *Session) { + scheduledSession = s + } + + broadcast := &mockBroadcast{} + storage := &mockStorage{} + + p := NewPWD(docker, tasks, broadcast, storage) before := time.Now() - s, e := p.NewSession(time.Hour, "", "") + + s, e := p.SessionNew(time.Hour, "", "") assert.Nil(t, e) assert.NotNil(t, s) @@ -28,6 +47,21 @@ func TestNewSession_WithoutStack(t *testing.T) { assert.WithinDuration(t, s.CreatedAt, before, time.Since(before)) assert.WithinDuration(t, s.ExpiresAt, before.Add(time.Hour), time.Second) assert.Equal(t, s.Id, createdNetworkId) + assert.True(t, s.Ready) + + s, _ = p.SessionNew(time.Hour, "stackPath", "stackName") + + assert.Equal(t, "stackPath", s.Stack) + assert.Equal(t, "stackName", s.StackName) + assert.False(t, s.Ready) assert.NotNil(t, s.closingTimer) + + assert.Equal(t, config.PWDContainerName, connectContainerName) + assert.Equal(t, s.Id, connectNetworkName) + assert.Empty(t, connectIP) + + assert.Equal(t, "10.0.0.1", s.PwdIpAddress) + + assert.Equal(t, s, scheduledSession) } diff --git a/pwd/storage.go b/pwd/storage.go new file mode 100644 index 0000000..e3b2509 --- /dev/null +++ b/pwd/storage.go @@ -0,0 +1,50 @@ +package pwd + +import ( + "encoding/gob" + "os" + "sync" + + "github.com/play-with-docker/play-with-docker/config" +) + +type StorageApi interface { + Save() error + Load() error +} + +type storage struct { + rw sync.Mutex +} + +func (store *storage) Load() error { + file, err := os.Open(config.SessionsFile) + + if err == nil { + decoder := gob.NewDecoder(file) + err = decoder.Decode(&sessions) + + if err != nil { + return err + } + } + + file.Close() + return nil +} + +func (store *storage) Save() error { + store.rw.Lock() + defer store.rw.Unlock() + file, err := os.Create(config.SessionsFile) + if err == nil { + encoder := gob.NewEncoder(file) + err = encoder.Encode(&sessions) + } + file.Close() + return nil +} + +func NewStorage() *storage { + return &storage{} +} diff --git a/pwd/storage_mock_test.go b/pwd/storage_mock_test.go new file mode 100644 index 0000000..1144120 --- /dev/null +++ b/pwd/storage_mock_test.go @@ -0,0 +1,11 @@ +package pwd + +type mockStorage struct { +} + +func (m *mockStorage) Save() error { + return nil +} +func (m *mockStorage) Load() error { + return nil +} diff --git a/pwd/tasks.go b/pwd/tasks.go new file mode 100644 index 0000000..9a6071e --- /dev/null +++ b/pwd/tasks.go @@ -0,0 +1,118 @@ +package pwd + +import ( + "crypto/tls" + "fmt" + "log" + "net" + "net/http" + "sort" + "strings" + "sync" + "time" + + "github.com/docker/docker/api" + "github.com/docker/docker/client" + "github.com/docker/go-connections/tlsconfig" + "github.com/play-with-docker/play-with-docker/docker" +) + +type periodicTask interface { + Run(i *Instance) error +} + +type SchedulerApi interface { + Schedule(session *Session) + Unschedule(session *Session) +} + +type scheduler struct { + broadcast BroadcastApi + periodicTasks []periodicTask +} + +func (sch *scheduler) Schedule(s *Session) { + if s.scheduled { + return + } + + go func() { + s.scheduled = true + + s.ticker = time.NewTicker(1 * time.Second) + for range s.ticker.C { + var wg = sync.WaitGroup{} + wg.Add(len(s.Instances)) + for _, ins := range s.Instances { + var i *Instance = ins + if i.docker == nil { + // Need to create client to the DinD docker daemon + + // We check if the client needs to use TLS + var tlsConfig *tls.Config + if len(i.Cert) > 0 && len(i.Key) > 0 { + tlsConfig = tlsconfig.ClientDefault() + tlsConfig.InsecureSkipVerify = true + tlsCert, err := tls.X509KeyPair(i.Cert, i.Key) + if err != nil { + log.Println("Could not load X509 key pair: %v. Make sure the key is not encrypted", err) + continue + } + tlsConfig.Certificates = []tls.Certificate{tlsCert} + } + + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 1 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext} + if tlsConfig != nil { + transport.TLSClientConfig = tlsConfig + } + cli := &http.Client{ + Transport: transport, + } + c, err := client.NewClient(fmt.Sprintf("http://%s:2375", i.IP), api.DefaultVersion, cli, nil) + if err != nil { + log.Println("Could not connect to DinD docker daemon", err) + } else { + i.docker = docker.NewDocker(c) + } + } + go func() { + defer wg.Done() + for _, t := range sch.periodicTasks { + err := t.Run(i) + if err != nil { + if strings.Contains(err.Error(), "No such container") { + log.Printf("Container for instance [%s] doesn't exist any more.\n", i.IP) + //DeleteInstance(i.session, i) + } else { + log.Println(err) + } + break + } + } + }() + } + wg.Wait() + // broadcast all information + for _, ins := range s.Instances { + ins.Ports = UInt16Slice(ins.tempPorts) + sort.Sort(ins.Ports) + ins.tempPorts = []uint16{} + + sch.broadcast.BroadcastTo(ins.session.Id, "instance stats", ins.Name, ins.Mem, ins.Cpu, ins.IsManager, ins.Ports) + } + } + }() +} + +func (sch *scheduler) Unschedule(s *Session) { +} + +func NewScheduler(b BroadcastApi, d docker.DockerApi) *scheduler { + s := &scheduler{broadcast: b} + s.periodicTasks = []periodicTask{&collectStatsTask{docker: d}, &checkSwarmStatusTask{}, &checkUsedPortsTask{}, &checkSwarmUsedPortsTask{}} + return s +} diff --git a/pwd/tasks_mock_test.go b/pwd/tasks_mock_test.go new file mode 100644 index 0000000..e990c61 --- /dev/null +++ b/pwd/tasks_mock_test.go @@ -0,0 +1,17 @@ +package pwd + +type mockTasks struct { + schedule func(s *Session) + unschedule func(s *Session) +} + +func (m *mockTasks) Schedule(s *Session) { + if m.schedule != nil { + m.schedule(s) + } +} +func (m *mockTasks) Unschedule(s *Session) { + if m.unschedule != nil { + m.unschedule(s) + } +} diff --git a/services/recaptcha.go b/recaptcha/recaptcha.go similarity index 99% rename from services/recaptcha.go rename to recaptcha/recaptcha.go index e9455ac..4e6b80b 100644 --- a/services/recaptcha.go +++ b/recaptcha/recaptcha.go @@ -1,4 +1,4 @@ -package services +package recaptcha import ( "encoding/json" diff --git a/services/check_swarm_used_ports.go b/services/check_swarm_used_ports.go deleted file mode 100644 index 2ded9e7..0000000 --- a/services/check_swarm_used_ports.go +++ /dev/null @@ -1,17 +0,0 @@ -package services - -import "log" - -type checkSwarmUsedPortsTask struct { -} - -func (c checkSwarmUsedPortsTask) Run(i *Instance) error { - if i.IsManager != nil && *i.IsManager { - // This is a swarm manager instance, then check for ports - if err := SetInstanceSwarmPorts(i); err != nil { - log.Println(err) - return err - } - } - return nil -} diff --git a/services/client.go b/services/client.go deleted file mode 100644 index 3409e07..0000000 --- a/services/client.go +++ /dev/null @@ -1,75 +0,0 @@ -package services - -import ( - "log" - - "github.com/googollee/go-socket.io" -) - -type ViewPort struct { - Rows uint - Cols uint -} - -type Client struct { - Id string - so socketio.Socket - ViewPort ViewPort -} - -func (c *Client) ResizeViewPort(cols, rows uint) { - c.ViewPort.Rows = rows - c.ViewPort.Cols = cols -} - -func NewClient(so socketio.Socket, session *Session) *Client { - so.Join(session.Id) - - c := &Client{so: so, Id: so.Id()} - - so.On("session close", func() { - CloseSession(session) - }) - - so.On("terminal in", func(name, data string) { - // User wrote something on the terminal. Need to write it to the instance terminal - instance := GetInstance(session, name) - if instance != nil && instance.conn != nil && len(data) > 0 { - instance.conn.Conn.Write([]byte(data)) - } - }) - - so.On("viewport resize", func(cols, rows uint) { - // User resized his viewport - c.ResizeViewPort(cols, rows) - vp := session.GetSmallestViewPort() - // Resize all terminals in the session - wsServer.BroadcastTo(session.Id, "viewport resize", vp.Cols, vp.Rows) - for _, instance := range session.Instances { - err := instance.ResizeTerminal(vp.Cols, vp.Rows) - if err != nil { - log.Println("Error resizing terminal", err) - } - } - }) - - so.On("disconnection", func() { - // Client has disconnected. Remove from session and recheck terminal sizes. - for i, cl := range session.clients { - if cl.Id == c.Id { - session.clients = append(session.clients[:i], session.clients[i+1:]...) - break - } - } - if len(session.clients) > 0 { - vp := session.GetSmallestViewPort() - // Resize all terminals in the session - wsServer.BroadcastTo(session.Id, "viewport resize", vp.Cols, vp.Rows) - for _, instance := range session.Instances { - instance.ResizeTerminal(vp.Cols, vp.Rows) - } - } - setGauges() - }) - return c -} diff --git a/services/docker.go b/services/docker.go deleted file mode 100644 index bed7ece..0000000 --- a/services/docker.go +++ /dev/null @@ -1,391 +0,0 @@ -package services - -import ( - "archive/tar" - "bytes" - "fmt" - "io" - "io/ioutil" - "log" - "os" - "strconv" - "strings" - "time" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/network" - "github.com/docker/docker/client" - "golang.org/x/net/context" -) - -var c *client.Client - -const ( - Byte = 1 - Kilobyte = 1024 * Byte - Megabyte = 1024 * Kilobyte -) - -func init() { - var err error - c, err = client.NewEnvClient() - if err != nil { - // this wont happen if daemon is offline, only for some critical errors - log.Fatal("Cannot initialize docker client") - } - -} - -func GetContainerStats(id string) (io.ReadCloser, error) { - stats, err := c.ContainerStats(context.Background(), id, false) - - return stats.Body, err -} - -func GetContainerInfo(id string) (types.ContainerJSON, error) { - return c.ContainerInspect(context.Background(), id) -} - -func GetDaemonInfo(i *Instance) (types.Info, error) { - if i.dockerClient == nil { - return types.Info{}, fmt.Errorf("Docker client for DinD (%s) is not ready", i.IP) - } - return i.dockerClient.Info(context.Background()) -} - -func SetInstanceSwarmPorts(i *Instance) error { - if i.dockerClient == nil { - return fmt.Errorf("Docker client for DinD (%s) is not ready", i.IP) - } - - hostnamesIdx := map[string]*Instance{} - for _, ins := range i.session.Instances { - hostnamesIdx[ins.Hostname] = ins - } - - nodesIdx := map[string]*Instance{} - nodes, nodesErr := i.dockerClient.NodeList(context.Background(), types.NodeListOptions{}) - if nodesErr != nil { - return nodesErr - } - for _, n := range nodes { - nodesIdx[n.ID] = hostnamesIdx[n.Description.Hostname] - } - - tasks, err := i.dockerClient.TaskList(context.Background(), types.TaskListOptions{}) - if err != nil { - return err - } - services := map[string][]uint16{} - for _, t := range tasks { - services[t.ServiceID] = []uint16{} - } - for serviceID, _ := range services { - s, _, err := i.dockerClient.ServiceInspectWithRaw(context.Background(), serviceID, types.ServiceInspectOptions{}) - if err != nil { - return err - } - for _, p := range s.Endpoint.Ports { - services[serviceID] = append(services[serviceID], uint16(p.PublishedPort)) - } - } - for _, t := range tasks { - for _, n := range nodes { - ins := nodesIdx[n.ID] - if ins != nil { - for _, p := range services[t.ServiceID] { - ins.setUsedPort(p) - } - } - } - } - - return nil -} - -func GetUsedPorts(i *Instance) ([]uint16, error) { - if i.dockerClient == nil { - return nil, fmt.Errorf("Docker client for DinD (%s) is not ready", i.IP) - } - opts := types.ContainerListOptions{} - containers, err := i.dockerClient.ContainerList(context.Background(), opts) - if err != nil { - return nil, err - } - - openPorts := []uint16{} - for _, c := range containers { - for _, p := range c.Ports { - // When port is not published on the host docker return public port as 0, so we need to avoid it - if p.PublicPort != 0 { - openPorts = append(openPorts, p.PublicPort) - } - } - } - - return openPorts, nil -} - -func CreateNetwork(name string) error { - opts := types.NetworkCreate{Driver: "overlay", Attachable: true} - _, err := c.NetworkCreate(context.Background(), name, opts) - - if err != nil { - log.Printf("Starting session err [%s]\n", err) - - return err - } - - return nil -} -func ConnectNetwork(containerId, networkId, ip string) (string, error) { - settings := &network.EndpointSettings{} - if ip != "" { - settings.IPAddress = ip - } - err := c.NetworkConnect(context.Background(), networkId, containerId, settings) - - if err != nil && !strings.Contains(err.Error(), "already exists") { - log.Printf("Connection container to network err [%s]\n", err) - - return "", err - } - - // Obtain the IP of the PWD container in this network - container, err := c.ContainerInspect(context.Background(), containerId) - if err != nil { - return "", err - } - - n, found := container.NetworkSettings.Networks[networkId] - if !found { - return "", fmt.Errorf("Container [%s] connected to the network [%s] but couldn't obtain it's IP address", containerId, networkId) - } - - return n.IPAddress, nil -} - -func DisconnectNetwork(containerId, networkId string) error { - err := c.NetworkDisconnect(context.Background(), networkId, containerId, true) - - if err != nil { - log.Printf("Disconnection of container from network err [%s]\n", err) - - return err - } - - return nil -} - -func DeleteNetwork(id string) error { - err := c.NetworkRemove(context.Background(), id) - - if err != nil { - return err - } - - return nil -} - -func CreateAttachConnection(id string, ctx context.Context) (*types.HijackedResponse, error) { - - conf := types.ContainerAttachOptions{true, true, true, true, "ctrl-^,ctrl-^", true} - conn, err := c.ContainerAttach(ctx, id, conf) - if err != nil { - return nil, err - } - - return &conn, nil -} - -func ResizeConnection(name string, cols, rows uint) error { - return c.ContainerResize(context.Background(), name, types.ResizeOptions{Height: rows, Width: cols}) -} - -func CopyToContainer(containerName, destination, fileName string, content io.Reader) error { - r, w := io.Pipe() - b, readErr := ioutil.ReadAll(content) - if readErr != nil { - return readErr - } - t := tar.NewWriter(w) - go func() { - t.WriteHeader(&tar.Header{Name: fileName, Mode: 0600, Size: int64(len(b))}) - t.Write(b) - t.Close() - w.Close() - }() - return c.CopyToContainer(context.Background(), containerName, destination, r, types.CopyToContainerOptions{AllowOverwriteDirWithFile: true}) -} - -func CreateInstance(session *Session, conf InstanceConfig) (*Instance, error) { - var nodeName string - var containerName string - for i := 1; ; i++ { - nodeName = fmt.Sprintf("node%d", i) - containerName = fmt.Sprintf("%s_%s", session.Id[:8], nodeName) - exists := false - for _, instance := range session.Instances { - if instance.Name == containerName { - exists = true - break - } - } - if !exists { - break - } - } - - // Make sure directories are available for the new instance container - containerDir := "/var/run/pwd" - containerCertDir := fmt.Sprintf("%s/certs", containerDir) - - env := []string{} - - // Write certs to container cert dir - if len(conf.ServerCert) > 0 { - env = append(env, `DOCKER_TLSCERT=\/var\/run\/pwd\/certs\/cert.pem`) - } - if len(conf.ServerKey) > 0 { - env = append(env, `DOCKER_TLSKEY=\/var\/run\/pwd\/certs\/key.pem`) - } - if len(conf.CACert) > 0 { - // if ca cert is specified, verify that clients that connects present a certificate signed by the CA - env = append(env, `DOCKER_TLSCACERT=\/var\/run\/pwd\/certs\/ca.pem`) - } - if len(conf.ServerCert) > 0 || len(conf.ServerKey) > 0 || len(conf.CACert) > 0 { - // if any of the certs is specified, enable TLS - env = append(env, "DOCKER_TLSENABLE=true") - } else { - env = append(env, "DOCKER_TLSENABLE=false") - } - - h := &container.HostConfig{ - NetworkMode: container.NetworkMode(session.Id), - Privileged: true, - AutoRemove: true, - LogConfig: container.LogConfig{Config: map[string]string{"max-size": "10m", "max-file": "1"}}, - } - - if os.Getenv("APPARMOR_PROFILE") != "" { - h.SecurityOpt = []string{fmt.Sprintf("apparmor=%s", os.Getenv("APPARMOR_PROFILE"))} - } - - var pidsLimit = int64(1000) - if envLimit := os.Getenv("MAX_PROCESSES"); envLimit != "" { - if i, err := strconv.Atoi(envLimit); err == nil { - pidsLimit = int64(i) - } - } - h.Resources.PidsLimit = pidsLimit - h.Resources.Memory = 4092 * Megabyte - t := true - h.Resources.OomKillDisable = &t - - env = append(env, fmt.Sprintf("PWD_IP_ADDRESS=%s", session.PwdIpAddress)) - cf := &container.Config{Hostname: nodeName, - Image: dindImage, - Tty: true, - OpenStdin: true, - AttachStdin: true, - AttachStdout: true, - AttachStderr: true, - Env: env, - } - networkConf := &network.NetworkingConfig{ - map[string]*network.EndpointSettings{ - session.Id: &network.EndpointSettings{Aliases: []string{nodeName}}, - }, - } - container, err := c.ContainerCreate(context.Background(), cf, h, networkConf, containerName) - - if err != nil { - return nil, err - } - - if err := copyIfSet(conf.ServerCert, "cert.pem", containerCertDir, containerName); err != nil { - return nil, err - } - if err := copyIfSet(conf.ServerKey, "key.pem", containerCertDir, containerName); err != nil { - return nil, err - } - if err := copyIfSet(conf.CACert, "ca.pem", containerCertDir, containerName); err != nil { - return nil, err - } - - err = c.ContainerStart(context.Background(), container.ID, types.ContainerStartOptions{}) - if err != nil { - return nil, err - } - - cinfo, err := GetContainerInfo(container.ID) - if err != nil { - return nil, err - } - - return &Instance{ - Name: containerName, - Hostname: cinfo.Config.Hostname, - IP: cinfo.NetworkSettings.Networks[session.Id].IPAddress, - }, nil -} - -func copyIfSet(content []byte, fileName, path, containerName string) error { - if len(content) > 0 { - return CopyToContainer(containerName, path, fileName, bytes.NewReader(content)) - } - return nil -} - -func DeleteContainer(id string) error { - return c.ContainerRemove(context.Background(), id, types.ContainerRemoveOptions{Force: true, RemoveVolumes: true}) -} - -func Exec(instanceName string, command []string) (int, error) { - e, err := c.ContainerExecCreate(context.Background(), instanceName, types.ExecConfig{Cmd: command}) - if err != nil { - return 0, err - } - err = c.ContainerExecStart(context.Background(), e.ID, types.ExecStartCheck{}) - if err != nil { - return 0, err - } - var ins types.ContainerExecInspect - for _ = range time.Tick(1 * time.Second) { - ins, err = c.ContainerExecInspect(context.Background(), e.ID) - if ins.Running { - continue - } - if err != nil { - return 0, err - } - break - } - return ins.ExitCode, nil - -} -func ExecAttach(instanceName string, command []string, out io.Writer) (int, error) { - e, err := c.ContainerExecCreate(context.Background(), instanceName, types.ExecConfig{Cmd: command, AttachStdout: true, AttachStderr: true, Tty: true}) - if err != nil { - return 0, err - } - resp, err := c.ContainerExecAttach(context.Background(), e.ID, types.ExecConfig{AttachStdout: true, AttachStderr: true, Tty: true}) - if err != nil { - return 0, err - } - io.Copy(out, resp.Reader) - var ins types.ContainerExecInspect - for _ = range time.Tick(1 * time.Second) { - ins, err = c.ContainerExecInspect(context.Background(), e.ID) - if ins.Running { - continue - } - if err != nil { - return 0, err - } - break - } - return ins.ExitCode, nil - -} diff --git a/services/instance.go b/services/instance.go deleted file mode 100644 index 384f155..0000000 --- a/services/instance.go +++ /dev/null @@ -1,235 +0,0 @@ -package services - -import ( - "context" - "fmt" - "io" - "log" - "net/http" - "os" - "path/filepath" - "strings" - "sync" - - "golang.org/x/text/encoding" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/client" -) - -var rw sync.Mutex - -type UInt16Slice []uint16 - -func (p UInt16Slice) Len() int { return len(p) } -func (p UInt16Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p UInt16Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -type Instance struct { - session *Session `json:"-"` - Name string `json:"name"` - Hostname string `json:"hostname"` - IP string `json:"ip"` - conn *types.HijackedResponse `json:"-"` - ctx context.Context `json:"-"` - dockerClient *client.Client `json:"-"` - IsManager *bool `json:"is_manager"` - Mem string `json:"mem"` - Cpu string `json:"cpu"` - Alias string `json:"alias"` - tempPorts []uint16 `json:"-"` - ServerCert []byte `json:"server_cert"` - ServerKey []byte `json:"server_key"` - CACert []byte `json:"ca_cert"` - Cert []byte `json:"cert"` - Key []byte `json:"key"` - Ports UInt16Slice -} - -type InstanceConfig struct { - ImageName string - Alias string - ServerCert []byte - ServerKey []byte - CACert []byte - Cert []byte - Key []byte -} - -func (i *Instance) setUsedPort(port uint16) { - rw.Lock() - defer rw.Unlock() - - for _, p := range i.tempPorts { - if p == port { - return - } - } - i.tempPorts = append(i.tempPorts, port) -} - -func (i *Instance) IsConnected() bool { - return i.conn != nil - -} - -func (i *Instance) SetSession(s *Session) { - i.session = s -} - -var dindImage string - -func init() { - dindImage = getDindImageName() -} - -func getDindImageName() string { - dindImage := os.Getenv("DIND_IMAGE") - defaultDindImageName := "franela/dind" - if len(dindImage) == 0 { - dindImage = defaultDindImageName - } - return dindImage -} - -func NewInstance(session *Session, conf InstanceConfig) (*Instance, error) { - if conf.ImageName == "" { - conf.ImageName = dindImage - } - log.Printf("NewInstance - using image: [%s]\n", conf.ImageName) - instance, err := CreateInstance(session, conf) - if err != nil { - return nil, err - } - - instance.Alias = conf.Alias - instance.Cert = conf.Cert - instance.Key = conf.Key - instance.ServerCert = conf.ServerCert - instance.ServerKey = conf.ServerKey - instance.CACert = conf.CACert - instance.session = session - - if session.Instances == nil { - session.Instances = make(map[string]*Instance) - } - session.Instances[instance.Name] = instance - - go instance.Attach() - - err = saveSessionsToDisk() - if err != nil { - return nil, err - } - - wsServer.BroadcastTo(session.Id, "new instance", instance.Name, instance.IP, instance.Hostname) - - setGauges() - - return instance, nil -} - -type sessionWriter struct { - instance *Instance -} - -func (s *sessionWriter) Write(p []byte) (n int, err error) { - wsServer.BroadcastTo(s.instance.session.Id, "terminal out", s.instance.Name, string(p)) - return len(p), nil -} - -func (i *Instance) ResizeTerminal(cols, rows uint) error { - return ResizeConnection(i.Name, cols, rows) -} - -func (i *Instance) Attach() { - i.ctx = context.Background() - conn, err := CreateAttachConnection(i.Name, i.ctx) - - if err != nil { - return - } - - i.conn = conn - - go func() { - encoder := encoding.Replacement.NewEncoder() - sw := &sessionWriter{instance: i} - io.Copy(encoder.Writer(sw), conn.Reader) - }() - - select { - case <-i.ctx.Done(): - } -} - -func (i *Instance) UploadFromURL(url string) error { - log.Printf("Downloading file [%s]\n", url) - resp, err := http.Get(url) - if err != nil { - return fmt.Errorf("Could not download file [%s]. Error: %s\n", url, err) - } - defer resp.Body.Close() - if resp.StatusCode != 200 { - return fmt.Errorf("Could not download file [%s]. Status code: %d\n", url, resp.StatusCode) - } - - _, fileName := filepath.Split(url) - - copyErr := CopyToContainer(i.Name, "/var/run/pwd/uploads", fileName, resp.Body) - - if copyErr != nil { - return fmt.Errorf("Error while downloading file [%s]. Error: %s\n", url, copyErr) - } - - return nil -} - -func GetInstance(session *Session, name string) *Instance { - return session.Instances[name] -} - -func FindInstanceByIP(ip string) *Instance { - for _, s := range sessions { - for _, i := range s.Instances { - if i.IP == ip { - return i - } - } - } - return nil -} - -func FindInstanceByAlias(sessionPrefix, alias string) *Instance { - for id, s := range sessions { - if strings.HasPrefix(id, sessionPrefix) { - for _, i := range s.Instances { - if i.Alias == alias { - return i - } - } - } - } - return nil -} - -func DeleteInstance(session *Session, instance *Instance) error { - if instance.conn != nil { - instance.conn.Close() - } - err := DeleteContainer(instance.Name) - if err != nil && !strings.Contains(err.Error(), "No such container") { - log.Println(err) - return err - } - - wsServer.BroadcastTo(session.Id, "delete instance", instance.Name) - - delete(session.Instances, instance.Name) - if err := saveSessionsToDisk(); err != nil { - return err - } - setGauges() - - return nil -} diff --git a/services/instance_images.go b/services/instance_images.go deleted file mode 100644 index 9d6c04d..0000000 --- a/services/instance_images.go +++ /dev/null @@ -1,10 +0,0 @@ -package services - -func InstanceImages() []string { - - return []string{ - dindImage, - "franela/dind:overlay2-dev", - } - -} diff --git a/services/session.go b/services/session.go deleted file mode 100644 index 7ea360a..0000000 --- a/services/session.go +++ /dev/null @@ -1,428 +0,0 @@ -package services - -import ( - "crypto/tls" - "encoding/gob" - "fmt" - "log" - "math" - "net" - "net/http" - "os" - "path" - "sort" - "strings" - "sync" - "time" - - "github.com/docker/docker/api" - "github.com/docker/docker/client" - "github.com/docker/go-connections/tlsconfig" - "github.com/googollee/go-socket.io" - "github.com/play-with-docker/play-with-docker/config" - "github.com/prometheus/client_golang/prometheus" - "github.com/twinj/uuid" -) - -var ( - sessionsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "sessions", - Help: "Sessions", - }) - clientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "clients", - Help: "Clients", - }) - instancesGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "instances", - Help: "Instances", - }) -) - -func init() { - prometheus.MustRegister(sessionsGauge) - prometheus.MustRegister(clientsGauge) - prometheus.MustRegister(instancesGauge) -} - -var wsServer *socketio.Server - -type Session struct { - rw sync.Mutex - Id string `json:"id"` - Instances map[string]*Instance `json:"instances"` - clients []*Client `json:"-"` - CreatedAt time.Time `json:"created_at"` - ExpiresAt time.Time `json:"expires_at"` - scheduled bool `json:"-"` - ticker *time.Ticker `json:"-"` - PwdIpAddress string `json:"pwd_ip_address"` - Ready bool `json:"ready"` - Stack string `json:"stack"` -} - -type sessionBuilderWriter struct { - session *Session -} - -func (s *sessionBuilderWriter) Write(p []byte) (n int, err error) { - wsServer.BroadcastTo(s.session.Id, "session builder out", string(p)) - return len(p), nil -} - -func (s *Session) Lock() { - s.rw.Lock() -} - -func (s *Session) Unlock() { - s.rw.Unlock() -} - -func (s *Session) GetSmallestViewPort() ViewPort { - minRows := s.clients[0].ViewPort.Rows - minCols := s.clients[0].ViewPort.Cols - - for _, c := range s.clients { - minRows = uint(math.Min(float64(minRows), float64(c.ViewPort.Rows))) - minCols = uint(math.Min(float64(minCols), float64(c.ViewPort.Cols))) - } - - return ViewPort{Rows: minRows, Cols: minCols} -} - -func (s *Session) DeployStack() error { - s.Lock() - defer s.Unlock() - - if s.Ready { - // a stack was already deployed on this session, just ignore - return nil - } - - s.setReady(false) - i, err := NewInstance(s, InstanceConfig{}) - if err != nil { - log.Printf("Error creating instance for stack [%s]: %s\n", s.Stack, err) - return err - } - err = i.UploadFromURL("https://raw.githubusercontent.com/play-with-docker/stacks/master" + s.Stack) - if err != nil { - log.Printf("Error uploading stack file [%s]: %s\n", s.Stack, err) - return err - } - - w := sessionBuilderWriter{session: s} - fileName := path.Base(s.Stack) - code, err := ExecAttach(i.Name, []string{"docker-compose", "-f", "/var/run/pwd/uploads/" + fileName, "up", "-d"}, &w) - if err != nil { - log.Printf("Error executing stack [%s]: %s\n", s.Stack, err) - return err - } - - log.Printf("Stack execution finished with code %d\n", code) - s.setReady(true) - if err := saveSessionsToDisk(); err != nil { - return err - } - return nil -} - -func (s *Session) setReady(ready bool) { - s.Ready = ready - wsServer.BroadcastTo(s.Id, "session ready", s.Ready) -} - -func (s *Session) AddNewClient(c *Client) { - s.clients = append(s.clients, c) - setGauges() -} - -func (s *Session) SchedulePeriodicTasks() { - if s.scheduled { - return - } - - go func() { - s.scheduled = true - - s.ticker = time.NewTicker(1 * time.Second) - for range s.ticker.C { - var wg = sync.WaitGroup{} - wg.Add(len(s.Instances)) - for _, ins := range s.Instances { - var i *Instance = ins - if i.dockerClient == nil { - // Need to create client to the DinD docker daemon - - // We check if the client needs to use TLS - var tlsConfig *tls.Config - if len(i.Cert) > 0 && len(i.Key) > 0 { - tlsConfig = tlsconfig.ClientDefault() - tlsConfig.InsecureSkipVerify = true - tlsCert, err := tls.X509KeyPair(i.Cert, i.Key) - if err != nil { - log.Println("Could not load X509 key pair: %v. Make sure the key is not encrypted", err) - continue - } - tlsConfig.Certificates = []tls.Certificate{tlsCert} - } - - transport := &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: 1 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext} - if tlsConfig != nil { - transport.TLSClientConfig = tlsConfig - } - cli := &http.Client{ - Transport: transport, - } - c, err := client.NewClient(fmt.Sprintf("http://%s:2375", i.IP), api.DefaultVersion, cli, nil) - if err != nil { - log.Println("Could not connect to DinD docker daemon", err) - } else { - i.dockerClient = c - } - } - go func() { - defer wg.Done() - for _, t := range periodicTasks { - err := t.Run(i) - if err != nil { - if strings.Contains(err.Error(), "No such container") { - log.Printf("Container for instance [%s] doesn't exist any more. Deleting from session.\n", i.IP) - DeleteInstance(i.session, i) - } else { - log.Println(err) - } - break - } - } - }() - } - wg.Wait() - // broadcast all information - for _, ins := range s.Instances { - ins.Ports = UInt16Slice(ins.tempPorts) - sort.Sort(ins.Ports) - ins.tempPorts = []uint16{} - - wsServer.BroadcastTo(ins.session.Id, "instance stats", ins.Name, ins.Mem, ins.Cpu, ins.IsManager, ins.Ports) - } - } - }() -} - -var sessions map[string]*Session - -func init() { - sessions = make(map[string]*Session) -} - -func CreateWSServer() *socketio.Server { - server, err := socketio.NewServer(nil) - if err != nil { - log.Fatal(err) - } - wsServer = server - return server -} - -func CloseSessionAfter(s *Session, d time.Duration) { - time.AfterFunc(d, func() { - CloseSession(s) - }) -} - -func CloseSession(s *Session) error { - s.rw.Lock() - defer s.rw.Unlock() - - if s.ticker != nil { - s.ticker.Stop() - } - wsServer.BroadcastTo(s.Id, "session end") - for _, c := range s.clients { - c.so.Emit("disconnect") - } - log.Printf("Starting clean up of session [%s]\n", s.Id) - for _, i := range s.Instances { - err := DeleteInstance(s, i) - if err != nil { - log.Println(err) - return err - } - } - // Disconnect PWD daemon from the network - if err := DisconnectNetwork("pwd", s.Id); err != nil { - if !strings.Contains(err.Error(), "is not connected to the network") { - log.Println("ERROR NETWORKING") - return err - } - } - log.Printf("Disconnected pwd from network [%s]\n", s.Id) - if err := DeleteNetwork(s.Id); err != nil { - if !strings.Contains(err.Error(), "not found") { - log.Println(err) - return err - } - } - delete(sessions, s.Id) - - // We store sessions as soon as we delete one - if err := saveSessionsToDisk(); err != nil { - return err - } - setGauges() - log.Printf("Cleaned up session [%s]\n", s.Id) - return nil -} - -var defaultDuration = 4 * time.Hour - -func GetDuration(reqDur string) time.Duration { - if reqDur != "" { - if dur, err := time.ParseDuration(reqDur); err == nil && dur <= defaultDuration { - return dur - } - return defaultDuration - } - - envDur := os.Getenv("EXPIRY") - if dur, err := time.ParseDuration(envDur); err == nil { - return dur - } - - return defaultDuration -} - -func NewSession(duration time.Duration, stack string) (*Session, error) { - s := &Session{} - s.Id = uuid.NewV4().String() - s.Instances = map[string]*Instance{} - s.CreatedAt = time.Now() - s.ExpiresAt = s.CreatedAt.Add(duration) - if stack == "" { - s.Ready = true - } - s.Stack = stack - log.Printf("NewSession id=[%s]\n", s.Id) - - // Schedule cleanup of the session - CloseSessionAfter(s, duration) - - if err := CreateNetwork(s.Id); err != nil { - log.Println("ERROR NETWORKING") - return nil, err - } - log.Printf("Network [%s] created for session [%s]\n", s.Id, s.Id) - - // Connect PWD daemon to the new network - ip, err := ConnectNetwork(config.PWDContainerName, s.Id, "") - if err != nil { - log.Println("ERROR NETWORKING") - return nil, err - } - s.PwdIpAddress = ip - log.Printf("Connected %s to network [%s]\n", config.PWDContainerName, s.Id) - - // Schedule peridic tasks execution - s.SchedulePeriodicTasks() - - sessions[s.Id] = s - - // We store sessions as soon as we create one so we don't delete new sessions on an api restart - if err := saveSessionsToDisk(); err != nil { - return nil, err - } - - setGauges() - return s, nil -} - -func GetSession(sessionId string) *Session { - s := sessions[sessionId] - if s != nil { - for _, instance := range s.Instances { - if !instance.IsConnected() { - instance.SetSession(s) - go instance.Attach() - } - } - - } - return s -} - -func setGauges() { - var ins float64 - var cli float64 - - for _, s := range sessions { - ins += float64(len(s.Instances)) - cli += float64(len(s.clients)) - } - - clientsGauge.Set(cli) - instancesGauge.Set(ins) - sessionsGauge.Set(float64(len(sessions))) -} - -func LoadSessionsFromDisk() error { - file, err := os.Open(config.SessionsFile) - if err == nil { - decoder := gob.NewDecoder(file) - err = decoder.Decode(&sessions) - - if err != nil { - return err - } - - // schedule session expiration - for _, s := range sessions { - timeLeft := s.ExpiresAt.Sub(time.Now()) - CloseSessionAfter(s, timeLeft) - - // start collecting stats for every instance - for _, i := range s.Instances { - // wire the session back to the instance - i.session = s - } - - // Connect PWD daemon to the new network - if s.PwdIpAddress == "" { - log.Fatal("Cannot load stored sessions as they don't have the pwd ip address stored with them") - } - if _, err := ConnectNetwork(config.PWDContainerName, s.Id, s.PwdIpAddress); err != nil { - if strings.Contains(err.Error(), "Could not attach to network") { - log.Printf("Network for session [%s] doesn't exist. Removing all instances and session.", s.Id) - CloseSession(s) - } else { - log.Println("ERROR NETWORKING", err) - return err - } - } else { - log.Printf("Connected %s to network [%s]\n", config.PWDContainerName, s.Id) - - // Schedule peridic tasks execution - s.SchedulePeriodicTasks() - } - } - } - file.Close() - setGauges() - return err -} - -func saveSessionsToDisk() error { - rw.Lock() - defer rw.Unlock() - file, err := os.Create(config.SessionsFile) - if err == nil { - encoder := gob.NewEncoder(file) - err = encoder.Encode(&sessions) - } - file.Close() - return err -} diff --git a/services/task.go b/services/task.go deleted file mode 100644 index 30034ec..0000000 --- a/services/task.go +++ /dev/null @@ -1,11 +0,0 @@ -package services - -type periodicTask interface { - Run(i *Instance) error -} - -var periodicTasks []periodicTask - -func init() { - periodicTasks = append(periodicTasks, &collectStatsTask{}, &checkSwarmStatusTask{}, &checkUsedPortsTask{}, &checkSwarmUsedPortsTask{}) -} diff --git a/templates/welcome.go b/templates/welcome.go index 1e5de51..92f3d30 100644 --- a/templates/welcome.go +++ b/templates/welcome.go @@ -4,7 +4,7 @@ import ( "bytes" "html/template" - "github.com/play-with-docker/play-with-docker/services" + "github.com/play-with-docker/play-with-docker/recaptcha" ) func GetWelcomeTemplate() ([]byte, error) { @@ -13,7 +13,7 @@ func GetWelcomeTemplate() ([]byte, error) { return nil, tplErr } var b bytes.Buffer - tplExecuteErr := welcomeTemplate.ExecuteTemplate(&b, "GOOGLE_RECAPTCHA_SITE_KEY", services.GetGoogleRecaptchaSiteKey()) + tplExecuteErr := welcomeTemplate.ExecuteTemplate(&b, "GOOGLE_RECAPTCHA_SITE_KEY", recaptcha.GetGoogleRecaptchaSiteKey()) if tplExecuteErr != nil { return nil, tplExecuteErr }