diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 1f88f4b..47ad57b 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "log" + "sync" + "sync/atomic" "time" "github.com/play-with-docker/play-with-docker/event" @@ -30,6 +32,7 @@ type scheduledSession struct { session *types.Session cancel context.CancelFunc ticker *time.Ticker + busy int32 } type scheduler struct { @@ -119,7 +122,7 @@ func (s *scheduler) Start() { } func (s *scheduler) register(session *types.Session) *scheduledSession { - ss := &scheduledSession{session: session} + ss := &scheduledSession{session: session, busy: 0} s.scheduledSessions[session.Id] = ss return ss } @@ -128,12 +131,17 @@ func (s *scheduler) cron(ctx context.Context, session *scheduledSession) { for { select { case <-session.ticker.C: - if time.Now().After(session.session.ExpiresAt) { - // Session has expired. Need to close the session. - s.pwd.SessionClose(session.session) - return + if atomic.CompareAndSwapInt32(&session.busy, 0, 1) { + if time.Now().After(session.session.ExpiresAt) { + // Session has expired. Need to close the session. + s.pwd.SessionClose(session.session) + return + } else { + s.processSession(ctx, session.session) + } + atomic.StoreInt32(&session.busy, 0) } else { - s.processSession(ctx, session.session) + log.Printf("Session [%s] is currently busy. Will try next time.\n", session.session.Id) } case <-ctx.Done(): return @@ -153,15 +161,27 @@ func (s *scheduler) processSession(ctx context.Context, session *types.Session) return } + wg := sync.WaitGroup{} + wg.Add(len(updatedSession.Instances)) for _, ins := range updatedSession.Instances { - go s.processInstance(ctx, ins) + go func(ins *types.Instance) { + s.processInstance(ctx, ins) + wg.Done() + }(ins) } + wg.Wait() } func (s *scheduler) processInstance(ctx context.Context, instance *types.Instance) { + wg := sync.WaitGroup{} + wg.Add(len(s.tasks)) for _, task := range s.tasks { - go task.Run(ctx, instance) + go func(task Task) { + task.Run(ctx, instance) + wg.Done() + }(task) } + wg.Wait() } func (s *scheduler) Schedule(session *types.Session) error {