Storage has now it's own package.

Remove global `sessions` map and use configured storage.
Add a `types` package so both `pwd` and `storage` can access without
circular dependencies.
Now the session is prepared when requested and not on load.
This commit is contained in:
Jonathan Leibiusky @xetorthio
2017-06-14 18:35:21 -03:00
parent 2eff799c58
commit e9911abf94
22 changed files with 814 additions and 499 deletions

View File

@@ -11,6 +11,7 @@ import (
"github.com/play-with-docker/play-with-docker/config"
"github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/pwd/types"
"github.com/twinj/uuid"
)
@@ -35,32 +36,12 @@ type SessionSetupInstanceConf struct {
IsSwarmWorker bool `json:"is_swarm_worker"`
}
type Session struct {
rw sync.Mutex
Id string `json:"id"`
Instances map[string]*Instance `json:"instances"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
PwdIpAddress string `json:"pwd_ip_address"`
Ready bool `json:"ready"`
Stack string `json:"stack"`
StackName string `json:"stack_name"`
ImageName string `json:"image_name"`
closingTimer *time.Timer `json:"-"`
scheduled bool `json:"-"`
clients []*Client `json:"-"`
ticker *time.Ticker `json:"-"`
}
func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName string) (*Session, error) {
func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName string) (*types.Session, error) {
defer observeAction("SessionNew", time.Now())
sessionsMutex.Lock()
defer sessionsMutex.Unlock()
s := &Session{}
s := &types.Session{}
s.Id = uuid.NewV4().String()
s.Instances = map[string]*Instance{}
s.Instances = map[string]*types.Instance{}
s.CreatedAt = time.Now()
s.ExpiresAt = s.CreatedAt.Add(duration)
s.Ready = true
@@ -88,24 +69,24 @@ func (p *pwd) SessionNew(duration time.Duration, stack, stackName, imageName str
return nil, err
}
sessions[s.Id] = s
if err := p.storage.Save(); err != nil {
if err := p.storage.SessionPut(s); err != nil {
log.Println(err)
return nil, err
}
setGauges()
p.setGauges()
return s, nil
}
func (p *pwd) SessionClose(s *Session) error {
s.rw.Lock()
defer s.rw.Unlock()
func (p *pwd) SessionClose(s *types.Session) error {
defer observeAction("SessionClose", time.Now())
s.Lock()
defer s.Unlock()
s.StopTicker()
if s.ticker != nil {
s.ticker.Stop()
}
p.broadcast.BroadcastTo(s.Id, "session end")
p.broadcast.BroadcastTo(s.Id, "disconnect")
log.Printf("Starting clean up of session [%s]\n", s.Id)
@@ -130,32 +111,35 @@ func (p *pwd) SessionClose(s *Session) error {
return err
}
}
delete(sessions, s.Id)
// We store sessions as soon as we delete one
if err := p.storage.Save(); err != nil {
err := p.storage.SessionDelete(s.Id)
if err != nil {
return err
}
setGauges()
log.Printf("Cleaned up session [%s]\n", s.Id)
p.setGauges()
return nil
}
func (p *pwd) SessionGetSmallestViewPort(s *Session) ViewPort {
minRows := s.clients[0].viewPort.Rows
minCols := s.clients[0].viewPort.Cols
func (p *pwd) SessionGetSmallestViewPort(s *types.Session) types.ViewPort {
defer observeAction("SessionGetSmallestViewPort", time.Now())
for _, c := range s.clients {
minRows = uint(math.Min(float64(minRows), float64(c.viewPort.Rows)))
minCols = uint(math.Min(float64(minCols), float64(c.viewPort.Cols)))
minRows := s.Clients[0].ViewPort.Rows
minCols := s.Clients[0].ViewPort.Cols
for _, c := range s.Clients {
minRows = uint(math.Min(float64(minRows), float64(c.ViewPort.Rows)))
minCols = uint(math.Min(float64(minCols), float64(c.ViewPort.Cols)))
}
return ViewPort{Rows: minRows, Cols: minCols}
return types.ViewPort{Rows: minRows, Cols: minCols}
}
func (p *pwd) SessionDeployStack(s *Session) error {
func (p *pwd) SessionDeployStack(s *types.Session) error {
defer observeAction("SessionDeployStack", time.Now())
if s.Ready {
// a stack was already deployed on this session, just ignore
return nil
@@ -188,60 +172,28 @@ func (p *pwd) SessionDeployStack(s *Session) error {
log.Printf("Stack execution finished with code %d\n", code)
s.Ready = true
p.broadcast.BroadcastTo(s.Id, "session ready", true)
if err := p.storage.Save(); err != nil {
if err := p.storage.SessionPut(s); err != nil {
return err
}
return nil
}
func (p *pwd) SessionGet(sessionId string) *Session {
func (p *pwd) SessionGet(sessionId string) *types.Session {
defer observeAction("SessionGet", time.Now())
s := sessions[sessionId]
s, _ := p.storage.SessionGet(sessionId)
if err := p.prepareSession(s); err != nil {
log.Println(err)
return nil
}
return s
}
func (p *pwd) SessionLoadAndPrepare() error {
defer observeAction("SessionLoadAndPrepare", time.Now())
err := p.storage.Load()
if err != nil {
return err
}
wg := sync.WaitGroup{}
for _, s := range sessions {
// Connect PWD daemon to the new network
if s.PwdIpAddress == "" {
log.Printf("Cannot load store session [%s] as they don't have the pwd ip address stored with them\n", s.Id)
continue
}
wg.Add(1)
go func(s *Session) {
s.rw.Lock()
defer s.rw.Unlock()
defer wg.Done()
err := p.prepareSession(s)
if err != nil {
log.Println(err)
}
for _, i := range s.Instances {
// wire the session back to the instance
i.session = s
go p.InstanceAttachTerminal(i)
}
}(s)
}
wg.Wait()
setGauges()
return nil
}
func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
func (p *pwd) SessionSetup(session *types.Session, conf SessionSetupConf) error {
defer observeAction("SessionSetup", time.Now())
var tokens *docker.SwarmTokens = nil
var firstSwarmManager *Instance = nil
var firstSwarmManager *types.Instance = nil
// first look for a swarm manager and create it
for _, conf := range conf.Instances {
@@ -254,14 +206,14 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
if err != nil {
return err
}
if i.docker == nil {
if i.Docker == nil {
dock, err := p.docker.New(i.IP, i.Cert, i.Key)
if err != nil {
return err
}
i.docker = dock
i.Docker = dock
}
tkns, err := i.docker.SwarmInit()
tkns, err := i.Docker.SwarmInit()
if err != nil {
return err
}
@@ -290,13 +242,13 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
}
if c.IsSwarmManager || c.IsSwarmWorker {
// check if we have connection to the daemon, if not, create it
if i.docker == nil {
if i.Docker == nil {
dock, err := p.docker.New(i.IP, i.Cert, i.Key)
if err != nil {
log.Println(err)
return
}
i.docker = dock
i.Docker = dock
}
}
@@ -304,7 +256,7 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
if c.IsSwarmManager {
// this is a swarm manager
// cluster has already been initiated, join as manager
err := i.docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager)
err := i.Docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager)
if err != nil {
log.Println(err)
return
@@ -312,7 +264,7 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
}
if c.IsSwarmWorker {
// this is a swarm worker
err := i.docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Worker)
err := i.Docker.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Worker)
if err != nil {
log.Println(err)
return
@@ -330,7 +282,14 @@ func (p *pwd) SessionSetup(session *Session, conf SessionSetupConf) error {
// This function should be called any time a session needs to be prepared:
// 1. Like when it is created
// 2. When it was loaded from storage
func (p *pwd) prepareSession(session *Session) error {
func (p *pwd) prepareSession(session *types.Session) error {
session.Lock()
defer session.Unlock()
if session.IsPrepared() {
return nil
}
p.scheduleSessionClose(session)
// Connect PWD daemon to the new network
@@ -341,17 +300,24 @@ func (p *pwd) prepareSession(session *Session) error {
// Schedule periodic tasks
p.tasks.Schedule(session)
for _, i := range session.Instances {
// wire the session back to the instance
i.Session = session
go p.InstanceAttachTerminal(i)
}
session.SetPrepared()
return nil
}
func (p *pwd) scheduleSessionClose(s *Session) {
func (p *pwd) scheduleSessionClose(s *types.Session) {
timeLeft := s.ExpiresAt.Sub(time.Now())
s.closingTimer = time.AfterFunc(timeLeft, func() {
s.SetClosingTimer(time.AfterFunc(timeLeft, func() {
p.SessionClose(s)
})
}))
}
func (p *pwd) connectToNetwork(s *Session) error {
func (p *pwd) connectToNetwork(s *types.Session) error {
ip, err := p.docker.ConnectNetwork(config.PWDContainerName, s.Id, s.PwdIpAddress)
if err != nil {
log.Println("ERROR NETWORKING")