Don't have parallel execution of task if it takes longer than interval

time
This commit is contained in:
Jonathan Leibiusky @xetorthio
2017-08-28 11:53:12 -03:00
parent 26724c51be
commit beec1628be

View File

@@ -4,6 +4,8 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"sync"
"sync/atomic"
"time" "time"
"github.com/play-with-docker/play-with-docker/event" "github.com/play-with-docker/play-with-docker/event"
@@ -30,6 +32,7 @@ type scheduledSession struct {
session *types.Session session *types.Session
cancel context.CancelFunc cancel context.CancelFunc
ticker *time.Ticker ticker *time.Ticker
busy int32
} }
type scheduler struct { type scheduler struct {
@@ -119,7 +122,7 @@ func (s *scheduler) Start() {
} }
func (s *scheduler) register(session *types.Session) *scheduledSession { func (s *scheduler) register(session *types.Session) *scheduledSession {
ss := &scheduledSession{session: session} ss := &scheduledSession{session: session, busy: 0}
s.scheduledSessions[session.Id] = ss s.scheduledSessions[session.Id] = ss
return ss return ss
} }
@@ -128,12 +131,17 @@ func (s *scheduler) cron(ctx context.Context, session *scheduledSession) {
for { for {
select { select {
case <-session.ticker.C: case <-session.ticker.C:
if time.Now().After(session.session.ExpiresAt) { if atomic.CompareAndSwapInt32(&session.busy, 0, 1) {
// Session has expired. Need to close the session. if time.Now().After(session.session.ExpiresAt) {
s.pwd.SessionClose(session.session) // Session has expired. Need to close the session.
return s.pwd.SessionClose(session.session)
return
} else {
s.processSession(ctx, session.session)
}
atomic.StoreInt32(&session.busy, 0)
} else { } else {
s.processSession(ctx, session.session) log.Printf("Session [%s] is currently busy. Will try next time.\n", session.session.Id)
} }
case <-ctx.Done(): case <-ctx.Done():
return return
@@ -153,15 +161,27 @@ func (s *scheduler) processSession(ctx context.Context, session *types.Session)
return return
} }
wg := sync.WaitGroup{}
wg.Add(len(updatedSession.Instances))
for _, ins := range updatedSession.Instances { for _, ins := range updatedSession.Instances {
go s.processInstance(ctx, ins) go func(ins *types.Instance) {
s.processInstance(ctx, ins)
wg.Done()
}(ins)
} }
wg.Wait()
} }
func (s *scheduler) processInstance(ctx context.Context, instance *types.Instance) { func (s *scheduler) processInstance(ctx context.Context, instance *types.Instance) {
wg := sync.WaitGroup{}
wg.Add(len(s.tasks))
for _, task := range s.tasks { for _, task := range s.tasks {
go task.Run(ctx, instance) go func(task Task) {
task.Run(ctx, instance)
wg.Done()
}(task)
} }
wg.Wait()
} }
func (s *scheduler) Schedule(session *types.Session) error { func (s *scheduler) Schedule(session *types.Session) error {