This commit is contained in:
Jonathan Leibiusky @xetorthio
2017-05-23 19:29:36 -03:00
parent 911d56bc49
commit 3d96760a98
41 changed files with 1454 additions and 1329 deletions

34
pwd/broadcast.go Normal file
View File

@@ -0,0 +1,34 @@
package pwd
import (
"net/http"
"github.com/googollee/go-socket.io"
)
type BroadcastApi interface {
BroadcastTo(sessionId, eventName string, args ...interface{})
GetHandler() http.Handler
}
type broadcast struct {
sio *socketio.Server
}
func (b *broadcast) BroadcastTo(sessionId, eventName string, args ...interface{}) {
b.sio.BroadcastTo(sessionId, eventName, args...)
}
func (b *broadcast) GetHandler() http.Handler {
return b.sio
}
func NewBroadcast(connectionEvent, errorEvent interface{}) (*broadcast, error) {
server, err := socketio.NewServer(nil)
if err != nil {
return nil, err
}
server.On("connection", connectionEvent)
server.On("error", errorEvent)
return &broadcast{sio: server}, nil
}

View File

@@ -0,0 +1,12 @@
package pwd
import "net/http"
type mockBroadcast struct {
}
func (m *mockBroadcast) BroadcastTo(sessionId, eventName string, args ...interface{}) {
}
func (m *mockBroadcast) GetHandler() http.Handler {
return nil
}

View File

@@ -0,0 +1,24 @@
package pwd
import (
"log"
"github.com/docker/docker/api/types/swarm"
)
type checkSwarmStatusTask struct {
}
func (c checkSwarmStatusTask) Run(i *Instance) error {
if info, err := i.docker.GetDaemonInfo(); err == nil {
if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked {
i.IsManager = &info.Swarm.ControlAvailable
} else {
i.IsManager = nil
}
} else {
log.Println(err)
return err
}
return nil
}

View File

@@ -0,0 +1,30 @@
package pwd
import (
"fmt"
"log"
)
type checkSwarmUsedPortsTask struct {
}
func (c checkSwarmUsedPortsTask) Run(i *Instance) error {
if i.IsManager != nil && *i.IsManager {
sessionPrefix := i.session.Id[:8]
// This is a swarm manager instance, then check for ports
if hosts, ports, err := i.docker.GetSwarmPorts(); err != nil {
log.Println(err)
return err
} else {
for _, host := range hosts {
host = fmt.Sprintf("%s_%s", sessionPrefix, host)
for _, port := range ports {
if i.session.Instances[host] != nil {
i.session.Instances[host].setUsedPort(port)
}
}
}
}
}
return nil
}

View File

@@ -0,0 +1,18 @@
package pwd
import "log"
type checkUsedPortsTask struct {
}
func (c checkUsedPortsTask) Run(i *Instance) error {
if ports, err := i.docker.GetPorts(); err == nil {
for _, p := range ports {
i.setUsedPort(uint16(p))
}
} else {
log.Println(err)
return err
}
return nil
}

54
pwd/client.go Normal file
View File

@@ -0,0 +1,54 @@
package pwd
import "log"
type Client struct {
Id string
viewPort ViewPort
session *Session
}
type ViewPort struct {
Rows uint
Cols uint
}
func (p *pwd) ClientNew(id string, session *Session) *Client {
c := &Client{Id: id, session: session}
session.clients = append(session.clients, c)
return c
}
func (p *pwd) ClientResizeViewPort(c *Client, cols, rows uint) {
c.viewPort.Rows = rows
c.viewPort.Cols = cols
p.notifyClientSmallestViewPort(c.session)
}
func (p *pwd) ClientClose(client *Client) {
// Client has disconnected. Remove from session and recheck terminal sizes.
session := client.session
for i, cl := range session.clients {
if cl.Id == client.Id {
session.clients = append(session.clients[:i], session.clients[i+1:]...)
break
}
}
if len(session.clients) > 0 {
p.notifyClientSmallestViewPort(session)
}
setGauges()
}
func (p *pwd) notifyClientSmallestViewPort(session *Session) {
vp := p.SessionGetSmallestViewPort(session)
// Resize all terminals in the session
p.broadcast.BroadcastTo(session.Id, "viewport resize", vp.Cols, vp.Rows)
for _, instance := range session.Instances {
err := p.InstanceResizeTerminal(instance, vp.Rows, vp.Cols)
if err != nil {
log.Println("Error resizing terminal", err)
}
}
}

69
pwd/collect_stats_task.go Normal file
View File

@@ -0,0 +1,69 @@
package pwd
import (
"encoding/json"
"fmt"
"log"
"github.com/docker/docker/api/types"
units "github.com/docker/go-units"
"github.com/play-with-docker/play-with-docker/docker"
)
type collectStatsTask struct {
mem float64
memLimit float64
memPercent float64
cpuPercent float64
previousCPU uint64
previousSystem uint64
docker docker.DockerApi
}
func (c collectStatsTask) Run(i *Instance) error {
reader, err := c.docker.GetContainerStats(i.Name)
if err != nil {
log.Println("Error while trying to collect instance stats", err)
return err
}
dec := json.NewDecoder(reader)
var v *types.StatsJSON
e := dec.Decode(&v)
if e != nil {
log.Println("Error while trying to collect instance stats", e)
return err
}
// Memory
if v.MemoryStats.Limit != 0 {
c.memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
}
c.mem = float64(v.MemoryStats.Usage)
c.memLimit = float64(v.MemoryStats.Limit)
i.Mem = fmt.Sprintf("%.2f%% (%s / %s)", c.memPercent, units.BytesSize(c.mem), units.BytesSize(c.memLimit))
// cpu
c.previousCPU = v.PreCPUStats.CPUUsage.TotalUsage
c.previousSystem = v.PreCPUStats.SystemUsage
c.cpuPercent = calculateCPUPercentUnix(c.previousCPU, c.previousSystem, v)
i.Cpu = fmt.Sprintf("%.2f%%", c.cpuPercent)
return nil
}
func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 {
var (
cpuPercent = 0.0
// calculate the change for the cpu usage of the container in between readings
cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
// calculate the change for the entire system between readings
systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
)
if systemDelta > 0.0 && cpuDelta > 0.0 {
cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0
}
return cpuPercent
}

View File

@@ -1,9 +1,68 @@
package pwd
import (
"io"
"net"
"github.com/docker/docker/api/types"
"github.com/play-with-docker/play-with-docker/docker"
)
type mockDocker struct {
createNetwork func(string) error
createNetwork func(string) error
connectNetwork func(container, network, ip string) (string, error)
}
func (m *mockDocker) CreateNetwork(id string) error {
if m.createNetwork == nil {
return nil
}
return m.createNetwork(id)
}
func (m *mockDocker) ConnectNetwork(container, network, ip string) (string, error) {
if m.connectNetwork == nil {
return "10.0.0.1", nil
}
return m.connectNetwork(container, network, ip)
}
func (m *mockDocker) GetDaemonInfo() (types.Info, error) {
return types.Info{}, nil
}
func (m *mockDocker) GetSwarmPorts() ([]string, []uint16, error) {
return []string{}, []uint16{}, nil
}
func (m *mockDocker) GetPorts() ([]uint16, error) {
return []uint16{}, nil
}
func (m *mockDocker) GetContainerStats(name string) (io.ReadCloser, error) {
return nil, nil
}
func (m *mockDocker) ContainerResize(name string, rows, cols uint) error {
return nil
}
func (m *mockDocker) CreateAttachConnection(name string) (net.Conn, error) {
return nil, nil
}
func (m *mockDocker) CopyToContainer(containerName, destination, fileName string, content io.Reader) error {
return nil
}
func (m *mockDocker) DeleteContainer(id string) error {
return nil
}
func (m *mockDocker) CreateContainer(opts docker.CreateContainerOpts) (string, error) {
return "", nil
}
func (m *mockDocker) ExecAttach(instanceName string, command []string, out io.Writer) (int, error) {
return 0, nil
}
func (m *mockDocker) DisconnectNetwork(containerId, networkId string) error {
return nil
}
func (m *mockDocker) DeleteNetwork(id string) error {
return nil
}
func (m *mockDocker) Exec(instanceName string, command []string) (int, error) {
return 0, nil
}

266
pwd/instance.go Normal file
View File

@@ -0,0 +1,266 @@
package pwd
import (
"context"
"fmt"
"io"
"log"
"net"
"net/http"
"path/filepath"
"strings"
"sync"
"github.com/play-with-docker/play-with-docker/config"
"github.com/play-with-docker/play-with-docker/docker"
"golang.org/x/text/encoding"
)
type sessionWriter struct {
sessionId string
instanceName string
broadcast BroadcastApi
}
func (s *sessionWriter) Write(p []byte) (n int, err error) {
s.broadcast.BroadcastTo(s.sessionId, "terminal out", s.instanceName, string(p))
return len(p), nil
}
type UInt16Slice []uint16
func (p UInt16Slice) Len() int { return len(p) }
func (p UInt16Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p UInt16Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type Instance struct {
rw sync.Mutex
session *Session `json:"-"`
Name string `json:"name"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
conn net.Conn `json:"-"`
ctx context.Context `json:"-"`
docker docker.DockerApi `json:"-"`
IsManager *bool `json:"is_manager"`
Mem string `json:"mem"`
Cpu string `json:"cpu"`
Alias string `json:"alias"`
tempPorts []uint16 `json:"-"`
ServerCert []byte `json:"server_cert"`
ServerKey []byte `json:"server_key"`
CACert []byte `json:"ca_cert"`
Cert []byte `json:"cert"`
Key []byte `json:"key"`
Ports UInt16Slice
}
type InstanceConfig struct {
ImageName string
Alias string
ServerCert []byte
ServerKey []byte
CACert []byte
Cert []byte
Key []byte
}
func (i *Instance) setUsedPort(port uint16) {
i.rw.Lock()
defer i.rw.Unlock()
for _, p := range i.tempPorts {
if p == port {
return
}
}
i.tempPorts = append(i.tempPorts, port)
}
func (i *Instance) IsConnected() bool {
return i.conn != nil
}
func (i *Instance) SetSession(s *Session) {
i.session = s
}
func (p *pwd) InstanceResizeTerminal(instance *Instance, rows, cols uint) error {
return p.docker.ContainerResize(instance.Name, rows, cols)
}
func (p *pwd) InstanceAttachTerminal(instance *Instance) error {
conn, err := p.docker.CreateAttachConnection(instance.Name)
if err != nil {
return err
}
encoder := encoding.Replacement.NewEncoder()
sw := &sessionWriter{sessionId: instance.session.Id, instanceName: instance.Name, broadcast: p.broadcast}
instance.conn = conn
io.Copy(encoder.Writer(sw), conn)
return nil
}
func (p *pwd) InstanceUploadFromUrl(instance *Instance, url string) error {
log.Printf("Downloading file [%s]\n", url)
resp, err := http.Get(url)
if err != nil {
return fmt.Errorf("Could not download file [%s]. Error: %s\n", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("Could not download file [%s]. Status code: %d\n", url, resp.StatusCode)
}
_, fileName := filepath.Split(url)
copyErr := p.docker.CopyToContainer(instance.Name, "/var/run/pwd/uploads", fileName, resp.Body)
if copyErr != nil {
return fmt.Errorf("Error while downloading file [%s]. Error: %s\n", url, copyErr)
}
return nil
}
func (p *pwd) InstanceGet(session *Session, name string) *Instance {
return session.Instances[name]
}
func (p *pwd) InstanceFindByIP(ip string) *Instance {
for _, s := range sessions {
for _, i := range s.Instances {
if i.IP == ip {
return i
}
}
}
return nil
}
func (p *pwd) InstanceFindByAlias(sessionPrefix, alias string) *Instance {
for id, s := range sessions {
if strings.HasPrefix(id, sessionPrefix) {
for _, i := range s.Instances {
if i.Alias == alias {
return i
}
}
}
}
return nil
}
func (p *pwd) InstanceDelete(session *Session, instance *Instance) error {
if instance.conn != nil {
instance.conn.Close()
}
err := p.docker.DeleteContainer(instance.Name)
if err != nil && !strings.Contains(err.Error(), "No such container") {
log.Println(err)
return err
}
p.broadcast.BroadcastTo(session.Id, "delete instance", instance.Name)
delete(session.Instances, instance.Name)
if err := p.storage.Save(); err != nil {
return err
}
setGauges()
return nil
}
func (p *pwd) InstanceNew(session *Session, conf InstanceConfig) (*Instance, error) {
if conf.ImageName == "" {
conf.ImageName = config.GetDindImageName()
}
log.Printf("NewInstance - using image: [%s]\n", conf.ImageName)
var nodeName string
var containerName string
for i := 1; ; i++ {
nodeName = fmt.Sprintf("node%d", i)
containerName = fmt.Sprintf("%s_%s", session.Id[:8], nodeName)
exists := false
for _, instance := range session.Instances {
if instance.Name == containerName {
exists = true
break
}
}
if !exists {
break
}
}
opts := docker.CreateContainerOpts{
Image: config.GetDindImageName(),
SessionId: session.Id,
PwdIpAddress: session.PwdIpAddress,
ContainerName: containerName,
Hostname: nodeName,
ServerCert: conf.ServerCert,
ServerKey: conf.ServerKey,
CACert: conf.CACert,
}
ip, err := p.docker.CreateContainer(opts)
if err != nil {
return nil, err
}
instance := &Instance{}
instance.IP = ip
instance.Name = containerName
instance.Hostname = nodeName
instance.Alias = conf.Alias
instance.Cert = conf.Cert
instance.Key = conf.Key
instance.ServerCert = conf.ServerCert
instance.ServerKey = conf.ServerKey
instance.CACert = conf.CACert
instance.session = session
if session.Instances == nil {
session.Instances = make(map[string]*Instance)
}
session.Instances[instance.Name] = instance
go p.InstanceAttachTerminal(instance)
err = p.storage.Save()
if err != nil {
return nil, err
}
p.broadcast.BroadcastTo(session.Id, "new instance", instance.Name, instance.IP, instance.Hostname)
setGauges()
return instance, nil
}
func (p *pwd) InstanceWriteToTerminal(instance *Instance, data string) {
if instance != nil && instance.conn != nil && len(data) > 0 {
instance.conn.Write([]byte(data))
}
}
func (p *pwd) InstanceAllowedImages() []string {
return []string{
config.GetDindImageName(),
"franela/dind:overlay2-dev",
}
}
func (p *pwd) InstanceExec(instance *Instance, cmd []string) (int, error) {
return p.docker.Exec(instance.Name, cmd)
}

View File

@@ -1,41 +1,83 @@
package pwd
import (
"sync"
"time"
"github.com/play-with-docker/play-with-docker/docker"
"github.com/prometheus/client_golang/prometheus"
)
type Session struct {
rw sync.Mutex
Id string `json:"id"`
Instances map[string]*Instance `json:"instances"`
clients []*Client `json:"-"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
scheduled bool `json:"-"`
ticker *time.Ticker `json:"-"`
PwdIpAddress string `json:"pwd_ip_address"`
Ready bool `json:"ready"`
Stack string `json:"stack"`
closingTimer *time.Timer `json:"-"`
}
var (
sessionsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "sessions",
Help: "Sessions",
})
clientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "clients",
Help: "Clients",
})
instancesGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "instances",
Help: "Instances",
})
)
type Instance struct {
}
var sessions map[string]*Session
type Client struct {
func init() {
prometheus.MustRegister(sessionsGauge)
prometheus.MustRegister(clientsGauge)
prometheus.MustRegister(instancesGauge)
sessions = make(map[string]*Session)
}
type pwd struct {
docker docker.Docker `json:"-"`
docker docker.DockerApi
tasks SchedulerApi
broadcast BroadcastApi
storage StorageApi
}
type PWDApi interface {
NewSession(duration time.Duration, stack string) (*Session, error)
SessionNew(duration time.Duration, stack string, stackName string) (*Session, error)
SessionClose(session *Session) error
SessionGetSmallestViewPort(session *Session) ViewPort
SessionDeployStack(session *Session) error
SessionGet(id string) *Session
SessionLoadAndPrepare() error
InstanceNew(session *Session, conf InstanceConfig) (*Instance, error)
InstanceResizeTerminal(instance *Instance, cols, rows uint) error
InstanceAttachTerminal(instance *Instance) error
InstanceUploadFromUrl(instance *Instance, url string) error
InstanceGet(session *Session, name string) *Instance
InstanceFindByIP(ip string) *Instance
InstanceFindByAlias(sessionPrefix, alias string) *Instance
InstanceDelete(session *Session, instance *Instance) error
InstanceWriteToTerminal(instance *Instance, data string)
InstanceAllowedImages() []string
InstanceExec(instance *Instance, cmd []string) (int, error)
ClientNew(id string, session *Session) *Client
ClientResizeViewPort(client *Client, cols, rows uint)
ClientClose(client *Client)
}
func NewPWD(d docker.Docker) pwd {
return pwd{docker: d}
func NewPWD(d docker.DockerApi, t SchedulerApi, b BroadcastApi, s StorageApi) *pwd {
return &pwd{docker: d, tasks: t, broadcast: b, storage: s}
}
func setGauges() {
var ins float64
var cli float64
for _, s := range sessions {
ins += float64(len(s.Instances))
cli += float64(len(s.clients))
}
clientsGauge.Set(cli)
instancesGauge.Set(ins)
sessionsGauge.Set(float64(len(sessions)))
}

View File

@@ -1,25 +1,58 @@
package pwd
import (
"fmt"
"log"
"math"
"path"
"strings"
"sync"
"time"
"github.com/franela/play-with-docker.old/config"
"github.com/play-with-docker/play-with-docker/config"
"github.com/twinj/uuid"
)
func (p *pwd) NewSession(duration time.Duration, stack, stackName string) (*Session, error) {
type sessionBuilderWriter struct {
sessionId string
broadcast BroadcastApi
}
func (s *sessionBuilderWriter) Write(p []byte) (n int, err error) {
s.broadcast.BroadcastTo(s.sessionId, "session builder out", string(p))
return len(p), nil
}
type Session struct {
rw sync.Mutex
Id string `json:"id"`
Instances map[string]*Instance `json:"instances"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
PwdIpAddress string `json:"pwd_ip_address"`
Ready bool `json:"ready"`
Stack string `json:"stack"`
StackName string `json:"stack_name"`
closingTimer *time.Timer `json:"-"`
scheduled bool `json:"-"`
clients []*Client `json:"-"`
ticker *time.Ticker `json:"-"`
}
func (p *pwd) SessionNew(duration time.Duration, stack, stackName string) (*Session, error) {
s := &Session{}
s.Id = uuid.NewV4().String()
s.Instances = map[string]*Instance{}
s.CreatedAt = time.Now()
s.ExpiresAt = s.CreatedAt.Add(duration)
/*
if stack == "" {
s.Ready = true
}
s.Stack = stack
*/
s.Ready = true
s.Stack = stack
s.StackName = stackName
if s.Stack != "" {
s.Ready = false
}
log.Printf("NewSession id=[%s]\n", s.Id)
if err := p.docker.CreateNetwork(s.Id); err != nil {
@@ -28,39 +61,192 @@ func (p *pwd) NewSession(duration time.Duration, stack, stackName string) (*Sess
}
log.Printf("Network [%s] created for session [%s]\n", s.Id, s.Id)
s.Prepare()
if err := p.prepareSession(s); err != nil {
log.Println(err)
return nil, err
}
sessions[s.Id] = s
if err := p.storage.Save(); err != nil {
log.Println(err)
return nil, err
}
setGauges()
return s, nil
}
func (p *pwd) SessionClose(s *Session) error {
s.rw.Lock()
defer s.rw.Unlock()
if s.ticker != nil {
s.ticker.Stop()
}
p.broadcast.BroadcastTo(s.Id, "session end")
p.broadcast.BroadcastTo(s.Id, "disconnect")
log.Printf("Starting clean up of session [%s]\n", s.Id)
for _, i := range s.Instances {
err := p.InstanceDelete(s, i)
if err != nil {
log.Println(err)
return err
}
}
// Disconnect PWD daemon from the network
if err := p.docker.DisconnectNetwork(config.PWDContainerName, s.Id); err != nil {
if !strings.Contains(err.Error(), "is not connected to the network") {
log.Println("ERROR NETWORKING")
return err
}
}
log.Printf("Disconnected pwd from network [%s]\n", s.Id)
if err := p.docker.DeleteNetwork(s.Id); err != nil {
if !strings.Contains(err.Error(), "not found") {
log.Println(err)
return err
}
}
delete(sessions, s.Id)
// We store sessions as soon as we delete one
if err := p.storage.Save(); err != nil {
return err
}
setGauges()
log.Printf("Cleaned up session [%s]\n", s.Id)
return nil
}
func (p *pwd) SessionGetSmallestViewPort(s *Session) ViewPort {
minRows := s.clients[0].viewPort.Rows
minCols := s.clients[0].viewPort.Cols
for _, c := range s.clients {
minRows = uint(math.Min(float64(minRows), float64(c.viewPort.Rows)))
minCols = uint(math.Min(float64(minCols), float64(c.viewPort.Cols)))
}
return ViewPort{Rows: minRows, Cols: minCols}
}
func (p *pwd) SessionDeployStack(s *Session) error {
s.rw.Lock()
defer s.rw.Unlock()
if s.Ready {
// a stack was already deployed on this session, just ignore
return nil
}
s.Ready = false
p.broadcast.BroadcastTo(s.Id, "session ready", s.Ready)
i, err := p.InstanceNew(s, InstanceConfig{})
if err != nil {
log.Printf("Error creating instance for stack [%s]: %s\n", s.Stack, err)
return err
}
err = p.InstanceUploadFromUrl(i, "https://raw.githubusercontent.com/play-with-docker/stacks/master"+s.Stack)
if err != nil {
log.Printf("Error uploading stack file [%s]: %s\n", s.Stack, err)
return err
}
w := sessionBuilderWriter{sessionId: s.Id, broadcast: p.broadcast}
fileName := path.Base(s.Stack)
code, err := p.docker.ExecAttach(i.Name, []string{"docker-compose", "-f", "/var/run/pwd/uploads/" + fileName, "up", "-d"}, &w)
if err != nil {
log.Printf("Error executing stack [%s]: %s\n", s.Stack, err)
return err
}
log.Printf("Stack execution finished with code %d\n", code)
s.Ready = true
p.broadcast.BroadcastTo(s.Id, "session ready", s.Ready)
if err := p.storage.Save(); err != nil {
return err
}
return nil
}
func (p *pwd) SessionGet(sessionId string) *Session {
s := sessions[sessionId]
/*
if s != nil {
for _, instance := range s.Instances {
if !instance.IsConnected() {
instance.SetSession(s)
go instance.Attach()
}
}
}*/
return s
}
func (p *pwd) SessionLoadAndPrepare() error {
err := p.storage.Load()
if err != nil {
return err
}
for _, s := range sessions {
err := p.prepareSession(s)
if err != nil {
return err
}
for _, i := range s.Instances {
// wire the session back to the instance
i.session = s
go p.InstanceAttachTerminal(i)
}
// Connect PWD daemon to the new network
if s.PwdIpAddress == "" {
return fmt.Errorf("Cannot load stored sessions as they don't have the pwd ip address stored with them")
}
}
setGauges()
return nil
}
// This function should be called any time a session needs to be prepared:
// 1. Like when it is created
// 2. When it was loaded from storage
func (s *Session) Prepare() error {
s.scheduleSessionClose()
func (p *pwd) prepareSession(session *Session) error {
p.scheduleSessionClose(session)
// Connect PWD daemon to the new network
s.connectToNetwork()
if err := p.connectToNetwork(session); err != nil {
return nil
}
// Schedule periodic tasks
p.tasks.Schedule(session)
return nil
}
func (s *Session) scheduleSessionClose() {
func (p *pwd) scheduleSessionClose(s *Session) {
timeLeft := s.ExpiresAt.Sub(time.Now())
s.closingTimer = time.AfterFunc(timeLeft, func() {
s.Close()
p.SessionClose(s)
})
}
func (s *Session) Close() {
}
func (s *Session) connectToNetwork() {
ip, err := ConnectNetwork(config.PWDContainerName, s.Id, "")
func (p *pwd) connectToNetwork(s *Session) error {
ip, err := p.docker.ConnectNetwork(config.PWDContainerName, s.Id, s.PwdIpAddress)
if err != nil {
log.Println("ERROR NETWORKING")
return nil, err
return err
}
s.PwdIpAddress = ip
log.Printf("Connected %s to network [%s]\n", config.PWDContainerName, s.Id)
return nil
}

View File

@@ -4,22 +4,41 @@ import (
"testing"
"time"
"github.com/play-with-docker/play-with-docker/config"
"github.com/stretchr/testify/assert"
)
func TestNewSession_WithoutStack(t *testing.T) {
func TestSessionNew(t *testing.T) {
config.PWDContainerName = "pwd"
var connectContainerName, connectNetworkName, connectIP string
createdNetworkId := ""
mock := &mockDocker{}
mock.createNetwork = func(id string) error {
docker := &mockDocker{}
docker.createNetwork = func(id string) error {
createdNetworkId = id
return nil
}
docker.connectNetwork = func(containerName, networkName, ip string) (string, error) {
connectContainerName = containerName
connectNetworkName = networkName
connectIP = ip
return "10.0.0.1", nil
}
p := NewPWD(mock)
var scheduledSession *Session
tasks := &mockTasks{}
tasks.schedule = func(s *Session) {
scheduledSession = s
}
broadcast := &mockBroadcast{}
storage := &mockStorage{}
p := NewPWD(docker, tasks, broadcast, storage)
before := time.Now()
s, e := p.NewSession(time.Hour, "", "")
s, e := p.SessionNew(time.Hour, "", "")
assert.Nil(t, e)
assert.NotNil(t, s)
@@ -28,6 +47,21 @@ func TestNewSession_WithoutStack(t *testing.T) {
assert.WithinDuration(t, s.CreatedAt, before, time.Since(before))
assert.WithinDuration(t, s.ExpiresAt, before.Add(time.Hour), time.Second)
assert.Equal(t, s.Id, createdNetworkId)
assert.True(t, s.Ready)
s, _ = p.SessionNew(time.Hour, "stackPath", "stackName")
assert.Equal(t, "stackPath", s.Stack)
assert.Equal(t, "stackName", s.StackName)
assert.False(t, s.Ready)
assert.NotNil(t, s.closingTimer)
assert.Equal(t, config.PWDContainerName, connectContainerName)
assert.Equal(t, s.Id, connectNetworkName)
assert.Empty(t, connectIP)
assert.Equal(t, "10.0.0.1", s.PwdIpAddress)
assert.Equal(t, s, scheduledSession)
}

50
pwd/storage.go Normal file
View File

@@ -0,0 +1,50 @@
package pwd
import (
"encoding/gob"
"os"
"sync"
"github.com/play-with-docker/play-with-docker/config"
)
type StorageApi interface {
Save() error
Load() error
}
type storage struct {
rw sync.Mutex
}
func (store *storage) Load() error {
file, err := os.Open(config.SessionsFile)
if err == nil {
decoder := gob.NewDecoder(file)
err = decoder.Decode(&sessions)
if err != nil {
return err
}
}
file.Close()
return nil
}
func (store *storage) Save() error {
store.rw.Lock()
defer store.rw.Unlock()
file, err := os.Create(config.SessionsFile)
if err == nil {
encoder := gob.NewEncoder(file)
err = encoder.Encode(&sessions)
}
file.Close()
return nil
}
func NewStorage() *storage {
return &storage{}
}

11
pwd/storage_mock_test.go Normal file
View File

@@ -0,0 +1,11 @@
package pwd
type mockStorage struct {
}
func (m *mockStorage) Save() error {
return nil
}
func (m *mockStorage) Load() error {
return nil
}

118
pwd/tasks.go Normal file
View File

@@ -0,0 +1,118 @@
package pwd
import (
"crypto/tls"
"fmt"
"log"
"net"
"net/http"
"sort"
"strings"
"sync"
"time"
"github.com/docker/docker/api"
"github.com/docker/docker/client"
"github.com/docker/go-connections/tlsconfig"
"github.com/play-with-docker/play-with-docker/docker"
)
type periodicTask interface {
Run(i *Instance) error
}
type SchedulerApi interface {
Schedule(session *Session)
Unschedule(session *Session)
}
type scheduler struct {
broadcast BroadcastApi
periodicTasks []periodicTask
}
func (sch *scheduler) Schedule(s *Session) {
if s.scheduled {
return
}
go func() {
s.scheduled = true
s.ticker = time.NewTicker(1 * time.Second)
for range s.ticker.C {
var wg = sync.WaitGroup{}
wg.Add(len(s.Instances))
for _, ins := range s.Instances {
var i *Instance = ins
if i.docker == nil {
// Need to create client to the DinD docker daemon
// We check if the client needs to use TLS
var tlsConfig *tls.Config
if len(i.Cert) > 0 && len(i.Key) > 0 {
tlsConfig = tlsconfig.ClientDefault()
tlsConfig.InsecureSkipVerify = true
tlsCert, err := tls.X509KeyPair(i.Cert, i.Key)
if err != nil {
log.Println("Could not load X509 key pair: %v. Make sure the key is not encrypted", err)
continue
}
tlsConfig.Certificates = []tls.Certificate{tlsCert}
}
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 1 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext}
if tlsConfig != nil {
transport.TLSClientConfig = tlsConfig
}
cli := &http.Client{
Transport: transport,
}
c, err := client.NewClient(fmt.Sprintf("http://%s:2375", i.IP), api.DefaultVersion, cli, nil)
if err != nil {
log.Println("Could not connect to DinD docker daemon", err)
} else {
i.docker = docker.NewDocker(c)
}
}
go func() {
defer wg.Done()
for _, t := range sch.periodicTasks {
err := t.Run(i)
if err != nil {
if strings.Contains(err.Error(), "No such container") {
log.Printf("Container for instance [%s] doesn't exist any more.\n", i.IP)
//DeleteInstance(i.session, i)
} else {
log.Println(err)
}
break
}
}
}()
}
wg.Wait()
// broadcast all information
for _, ins := range s.Instances {
ins.Ports = UInt16Slice(ins.tempPorts)
sort.Sort(ins.Ports)
ins.tempPorts = []uint16{}
sch.broadcast.BroadcastTo(ins.session.Id, "instance stats", ins.Name, ins.Mem, ins.Cpu, ins.IsManager, ins.Ports)
}
}
}()
}
func (sch *scheduler) Unschedule(s *Session) {
}
func NewScheduler(b BroadcastApi, d docker.DockerApi) *scheduler {
s := &scheduler{broadcast: b}
s.periodicTasks = []periodicTask{&collectStatsTask{docker: d}, &checkSwarmStatusTask{}, &checkUsedPortsTask{}, &checkSwarmUsedPortsTask{}}
return s
}

17
pwd/tasks_mock_test.go Normal file
View File

@@ -0,0 +1,17 @@
package pwd
type mockTasks struct {
schedule func(s *Session)
unschedule func(s *Session)
}
func (m *mockTasks) Schedule(s *Session) {
if m.schedule != nil {
m.schedule(s)
}
}
func (m *mockTasks) Unschedule(s *Session) {
if m.unschedule != nil {
m.unschedule(s)
}
}