Reimplement the scheduler to be a lot more reliable
This commit is contained in:
@@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
@@ -20,57 +18,89 @@ type Task interface {
|
||||
}
|
||||
|
||||
type SchedulerApi interface {
|
||||
Schedule(session *types.Session) error
|
||||
Unschedule(session *types.Session) error
|
||||
Start()
|
||||
Start() error
|
||||
Stop()
|
||||
AddTask(task Task) error
|
||||
RemoveTask(task Task) error
|
||||
}
|
||||
|
||||
type scheduledSession struct {
|
||||
session *types.Session
|
||||
cancel context.CancelFunc
|
||||
ticker *time.Ticker
|
||||
busy int32
|
||||
}
|
||||
|
||||
type scheduledInstance struct {
|
||||
instance *types.Instance
|
||||
ticker *time.Ticker
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
type scheduler struct {
|
||||
scheduledSessions map[string]*scheduledSession
|
||||
storage storage.StorageApi
|
||||
event event.EventApi
|
||||
pwd pwd.PWDApi
|
||||
tasks map[string]Task
|
||||
started bool
|
||||
scheduledSessions map[string]*scheduledSession
|
||||
scheduledInstances map[string]*scheduledInstance
|
||||
tasks map[string]Task
|
||||
started bool
|
||||
|
||||
storage storage.StorageApi
|
||||
event event.EventApi
|
||||
pwd pwd.PWDApi
|
||||
}
|
||||
|
||||
func NewScheduler(s storage.StorageApi, e event.EventApi, p pwd.PWDApi) (*scheduler, error) {
|
||||
func NewScheduler(tasks []Task, s storage.StorageApi, e event.EventApi, p pwd.PWDApi) (*scheduler, error) {
|
||||
sch := &scheduler{storage: s, event: e, pwd: p}
|
||||
|
||||
sch.tasks = make(map[string]Task)
|
||||
sch.scheduledSessions = make(map[string]*scheduledSession)
|
||||
sch.scheduledInstances = make(map[string]*scheduledInstance)
|
||||
|
||||
err := sch.loadFromStorage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for _, task := range tasks {
|
||||
if err := sch.addTask(task); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return sch, nil
|
||||
}
|
||||
|
||||
func (s *scheduler) loadFromStorage() error {
|
||||
sessions, err := s.storage.SessionGetAll()
|
||||
if err != nil {
|
||||
return err
|
||||
func (s *scheduler) processSession(ctx context.Context, ss *scheduledSession) {
|
||||
defer s.unscheduleSession(ss.session)
|
||||
select {
|
||||
case <-time.After(time.Until(ss.session.ExpiresAt)):
|
||||
// Session has expired. Need to close the session.
|
||||
s.pwd.SessionClose(ss.session)
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
for _, session := range sessions {
|
||||
s.register(session)
|
||||
}
|
||||
func (s *scheduler) processInstance(ctx context.Context, si *scheduledInstance) {
|
||||
defer s.unscheduleInstance(si.instance)
|
||||
for {
|
||||
select {
|
||||
case <-si.ticker.C:
|
||||
// First check if instance still exists
|
||||
_, err := s.storage.InstanceGet(si.instance.Name)
|
||||
if err != nil {
|
||||
if storage.NotFound(err) {
|
||||
// Instance doesn't exists anymore. Unschedule.
|
||||
log.Printf("Instance %s doesn't exists in storage.\n", si.instance.Name)
|
||||
return
|
||||
}
|
||||
log.Printf("Error retrieving instance %s from storage. Got: %v\n", si.instance.Name, err)
|
||||
continue
|
||||
}
|
||||
for _, task := range s.tasks {
|
||||
err := task.Run(ctx, si.instance)
|
||||
if err != nil {
|
||||
log.Printf("Error running task %s on instance %s. Got: %v\n", task.Name(), si.instance.Name, err)
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
log.Printf("Processing tasks for instance %s has been canceled.\n", si.instance.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scheduler) AddTask(task Task) error {
|
||||
func (s *scheduler) addTask(task Task) error {
|
||||
if _, found := s.tasks[task.Name()]; found {
|
||||
return fmt.Errorf("Task [%s] was already added", task.Name())
|
||||
}
|
||||
@@ -79,148 +109,110 @@ func (s *scheduler) AddTask(task Task) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scheduler) RemoveTask(task Task) error {
|
||||
if _, found := s.tasks[task.Name()]; !found {
|
||||
return fmt.Errorf("Task [%s] doesn't exist", task.Name())
|
||||
func (s *scheduler) unscheduleSession(session *types.Session) {
|
||||
ss, found := s.scheduledSessions[session.Id]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
delete(s.tasks, task.Name())
|
||||
|
||||
return nil
|
||||
ss.cancel()
|
||||
delete(s.scheduledSessions, ss.session.Id)
|
||||
log.Printf("Unscheduled session %s\n", session.Id)
|
||||
}
|
||||
func (s *scheduler) scheduleSession(session *types.Session) {
|
||||
if _, found := s.scheduledSessions[session.Id]; found {
|
||||
log.Printf("Session %s is already scheduled. Ignoring.\n", session.Id)
|
||||
return
|
||||
}
|
||||
ss := &scheduledSession{session: session}
|
||||
s.scheduledSessions[session.Id] = ss
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ss.cancel = cancel
|
||||
go s.processSession(ctx, ss)
|
||||
log.Printf("Scheduled session %s\n", session.Id)
|
||||
}
|
||||
func (s *scheduler) unscheduleInstance(instance *types.Instance) {
|
||||
si, found := s.scheduledInstances[instance.Name]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
si.cancel()
|
||||
si.ticker.Stop()
|
||||
delete(s.scheduledInstances, si.instance.Name)
|
||||
log.Printf("Unscheduled instance %s\n", instance.Name)
|
||||
}
|
||||
func (s *scheduler) scheduleInstance(instance *types.Instance) {
|
||||
if _, found := s.scheduledInstances[instance.Name]; found {
|
||||
log.Printf("Instance %s is already scheduled. Ignoring.\n", instance.Name)
|
||||
return
|
||||
}
|
||||
si := &scheduledInstance{instance: instance}
|
||||
s.scheduledInstances[instance.Name] = si
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
si.cancel = cancel
|
||||
si.ticker = time.NewTicker(time.Second)
|
||||
go s.processInstance(ctx, si)
|
||||
log.Printf("Scheduled instance %s\n", instance.Name)
|
||||
}
|
||||
|
||||
func (s *scheduler) Stop() {
|
||||
for _, session := range s.scheduledSessions {
|
||||
s.Unschedule(session.session)
|
||||
for _, ss := range s.scheduledSessions {
|
||||
s.unscheduleSession(ss.session)
|
||||
}
|
||||
for _, si := range s.scheduledInstances {
|
||||
s.unscheduleInstance(si.instance)
|
||||
}
|
||||
s.started = false
|
||||
}
|
||||
|
||||
func (s *scheduler) Start() {
|
||||
for _, session := range s.scheduledSessions {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
session.cancel = cancel
|
||||
session.ticker = time.NewTicker(1 * time.Second)
|
||||
go s.cron(ctx, session)
|
||||
func (s *scheduler) Start() error {
|
||||
sessions, err := s.storage.SessionGetAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, session := range sessions {
|
||||
s.scheduleSession(session)
|
||||
|
||||
instances, err := s.storage.InstanceFindBySessionId(session.Id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, instance := range instances {
|
||||
s.scheduleInstance(instance)
|
||||
}
|
||||
}
|
||||
s.event.On(event.SESSION_NEW, func(sessionId string, args ...interface{}) {
|
||||
log.Printf("EVENT: Session New %s\n", sessionId)
|
||||
session, err := s.storage.SessionGet(sessionId)
|
||||
if err != nil {
|
||||
log.Printf("Session [%s] was not found in storage. Got %s\n", sessionId, err)
|
||||
return
|
||||
}
|
||||
s.Schedule(session)
|
||||
s.scheduleSession(session)
|
||||
})
|
||||
s.event.On(event.SESSION_END, func(sessionId string, args ...interface{}) {
|
||||
log.Printf("EVENT: Session End %s\n", sessionId)
|
||||
session := &types.Session{Id: sessionId}
|
||||
err := s.Unschedule(session)
|
||||
s.unscheduleSession(session)
|
||||
})
|
||||
s.event.On(event.INSTANCE_NEW, func(sessionId string, args ...interface{}) {
|
||||
instanceName := args[0].(string)
|
||||
log.Printf("EVENT: Instance New %s\n", instanceName)
|
||||
instance, err := s.storage.InstanceGet(instanceName)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
log.Printf("Instance [%s] was not found in storage. Got %s\n", instanceName, err)
|
||||
return
|
||||
}
|
||||
s.scheduleInstance(instance)
|
||||
})
|
||||
s.event.On(event.INSTANCE_DELETE, func(sessionId string, args ...interface{}) {
|
||||
instanceName := args[0].(string)
|
||||
log.Printf("EVENT: Instance Delete %s\n", instanceName)
|
||||
instance := &types.Instance{Name: instanceName}
|
||||
s.unscheduleInstance(instance)
|
||||
})
|
||||
s.started = true
|
||||
}
|
||||
|
||||
func (s *scheduler) register(session *types.Session) *scheduledSession {
|
||||
ss := &scheduledSession{session: session, busy: 0}
|
||||
s.scheduledSessions[session.Id] = ss
|
||||
return ss
|
||||
}
|
||||
|
||||
func (s *scheduler) cron(ctx context.Context, session *scheduledSession) {
|
||||
for {
|
||||
select {
|
||||
case <-session.ticker.C:
|
||||
if atomic.CompareAndSwapInt32(&session.busy, 0, 1) {
|
||||
if time.Now().After(session.session.ExpiresAt) {
|
||||
// Session has expired. Need to close the session.
|
||||
s.pwd.SessionClose(session.session)
|
||||
return
|
||||
} else {
|
||||
s.processSession(ctx, session.session)
|
||||
}
|
||||
atomic.StoreInt32(&session.busy, 0)
|
||||
} else {
|
||||
log.Printf("Session [%s] is currently busy. Will try next time.\n", session.session.Id)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *scheduler) processSession(ctx context.Context, session *types.Session) {
|
||||
updatedSession, err := s.storage.SessionGet(session.Id)
|
||||
if err != nil {
|
||||
if storage.NotFound(err) {
|
||||
log.Printf("Session [%s] was not found in storage. Unscheduling.\n", session.Id)
|
||||
s.Unschedule(session)
|
||||
} else {
|
||||
log.Printf("Cannot process session. Got %s\n", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
instances, err := s.storage.InstanceFindBySessionId(updatedSession.Id)
|
||||
if err != nil {
|
||||
log.Printf("Couldn't find instances for session [%s]. Got: %v\n", updatedSession.Id, err)
|
||||
return
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(instances))
|
||||
for _, ins := range instances {
|
||||
go func(ins *types.Instance) {
|
||||
s.processInstance(ctx, ins)
|
||||
wg.Done()
|
||||
}(ins)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *scheduler) processInstance(ctx context.Context, instance *types.Instance) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(s.tasks))
|
||||
for _, task := range s.tasks {
|
||||
go func(task Task) {
|
||||
task.Run(ctx, instance)
|
||||
wg.Done()
|
||||
}(task)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *scheduler) Schedule(session *types.Session) error {
|
||||
if !s.started {
|
||||
return fmt.Errorf("Can only schedule sessions after the scheduler has been started.")
|
||||
}
|
||||
if _, found := s.scheduledSessions[session.Id]; found {
|
||||
return fmt.Errorf("Session [%s] was already scheduled", session.Id)
|
||||
}
|
||||
scheduledSession := s.register(session)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
scheduledSession.cancel = cancel
|
||||
scheduledSession.ticker = time.NewTicker(1 * time.Second)
|
||||
go s.cron(ctx, scheduledSession)
|
||||
|
||||
log.Printf("Scheduled session [%s]\n", session.Id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scheduler) Unschedule(session *types.Session) error {
|
||||
if !s.started {
|
||||
return fmt.Errorf("Can only schedule sessions after the scheduler has been started.")
|
||||
}
|
||||
if _, found := s.scheduledSessions[session.Id]; !found {
|
||||
return fmt.Errorf("Session [%s] in not scheduled", session.Id)
|
||||
}
|
||||
|
||||
scheduledSession := s.scheduledSessions[session.Id]
|
||||
scheduledSession.cancel()
|
||||
scheduledSession.ticker.Stop()
|
||||
delete(s.scheduledSessions, session.Id)
|
||||
|
||||
log.Printf("Unscheduled session [%s]\n", session.Id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user