From d566e5ee05a7e3f35af914ad89a5831a044b37ba Mon Sep 17 00:00:00 2001 From: "Jonathan Leibiusky @xetorthio" Date: Wed, 13 Sep 2017 19:20:21 -0300 Subject: [PATCH] Reimplement the scheduler to be a lot more reliable --- api.go | 13 +- pwd/session.go | 4 +- scheduler/scheduler.go | 284 ++++++++++++++++++------------------ scheduler/scheduler_test.go | 207 -------------------------- 4 files changed, 148 insertions(+), 360 deletions(-) delete mode 100644 scheduler/scheduler_test.go diff --git a/api.go b/api.go index be9eaa9..d7fa486 100644 --- a/api.go +++ b/api.go @@ -27,16 +27,17 @@ func main() { core := pwd.NewPWD(f, e, s, sp, ipf) - sch, err := scheduler.NewScheduler(s, e, core) + tasks := []scheduler.Task{ + task.NewCheckPorts(e, f), + task.NewCheckSwarmPorts(e, f), + task.NewCheckSwarmStatus(e, f), + task.NewCollectStats(e, f), + } + sch, err := scheduler.NewScheduler(tasks, s, e, core) if err != nil { log.Fatal("Error initializing the scheduler: ", err) } - sch.AddTask(task.NewCheckPorts(e, f)) - sch.AddTask(task.NewCheckSwarmPorts(e, f)) - sch.AddTask(task.NewCheckSwarmStatus(e, f)) - sch.AddTask(task.NewCollectStats(e, f)) - sch.Start() handlers.Bootstrap(core, e) diff --git a/pwd/session.go b/pwd/session.go index bb7fe48..8ece8a1 100644 --- a/pwd/session.go +++ b/pwd/session.go @@ -247,6 +247,7 @@ func (p *pwd) SessionSetup(session *types.Session, sconf SessionSetupConf) error if firstSwarmManager == nil { tkns, err := dockerClient.SwarmInit(i.IP) if err != nil { + log.Printf("Cannot initialize swarm on instance %s. Got: %v\n", i.Name, err) return err } tokens = tkns @@ -256,6 +257,7 @@ func (p *pwd) SessionSetup(session *types.Session, sconf SessionSetupConf) error } else { c.L.Unlock() if err := dockerClient.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Manager); err != nil { + log.Printf("Cannot join manager %s to swarm. Got: %v\n", i.Name, err) return err } } @@ -267,7 +269,7 @@ func (p *pwd) SessionSetup(session *types.Session, sconf SessionSetupConf) error c.L.Unlock() err = dockerClient.SwarmJoin(fmt.Sprintf("%s:2377", firstSwarmManager.IP), tokens.Worker) if err != nil { - log.Println(err) + log.Printf("Cannot join worker %s to swarm. Got: %v\n", i.Name, err) return err } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5a7d58b..c593f06 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "log" - "sync" - "sync/atomic" "time" "github.com/play-with-docker/play-with-docker/event" @@ -20,57 +18,89 @@ type Task interface { } type SchedulerApi interface { - Schedule(session *types.Session) error - Unschedule(session *types.Session) error - Start() + Start() error Stop() - AddTask(task Task) error - RemoveTask(task Task) error } type scheduledSession struct { session *types.Session cancel context.CancelFunc - ticker *time.Ticker - busy int32 +} + +type scheduledInstance struct { + instance *types.Instance + ticker *time.Ticker + cancel context.CancelFunc } type scheduler struct { - scheduledSessions map[string]*scheduledSession - storage storage.StorageApi - event event.EventApi - pwd pwd.PWDApi - tasks map[string]Task - started bool + scheduledSessions map[string]*scheduledSession + scheduledInstances map[string]*scheduledInstance + tasks map[string]Task + started bool + + storage storage.StorageApi + event event.EventApi + pwd pwd.PWDApi } -func NewScheduler(s storage.StorageApi, e event.EventApi, p pwd.PWDApi) (*scheduler, error) { +func NewScheduler(tasks []Task, s storage.StorageApi, e event.EventApi, p pwd.PWDApi) (*scheduler, error) { sch := &scheduler{storage: s, event: e, pwd: p} sch.tasks = make(map[string]Task) sch.scheduledSessions = make(map[string]*scheduledSession) + sch.scheduledInstances = make(map[string]*scheduledInstance) - err := sch.loadFromStorage() - if err != nil { - return nil, err + for _, task := range tasks { + if err := sch.addTask(task); err != nil { + return nil, err + } } return sch, nil } -func (s *scheduler) loadFromStorage() error { - sessions, err := s.storage.SessionGetAll() - if err != nil { - return err +func (s *scheduler) processSession(ctx context.Context, ss *scheduledSession) { + defer s.unscheduleSession(ss.session) + select { + case <-time.After(time.Until(ss.session.ExpiresAt)): + // Session has expired. Need to close the session. + s.pwd.SessionClose(ss.session) + return + case <-ctx.Done(): + return } - for _, session := range sessions { - s.register(session) +} +func (s *scheduler) processInstance(ctx context.Context, si *scheduledInstance) { + defer s.unscheduleInstance(si.instance) + for { + select { + case <-si.ticker.C: + // First check if instance still exists + _, err := s.storage.InstanceGet(si.instance.Name) + if err != nil { + if storage.NotFound(err) { + // Instance doesn't exists anymore. Unschedule. + log.Printf("Instance %s doesn't exists in storage.\n", si.instance.Name) + return + } + log.Printf("Error retrieving instance %s from storage. Got: %v\n", si.instance.Name, err) + continue + } + for _, task := range s.tasks { + err := task.Run(ctx, si.instance) + if err != nil { + log.Printf("Error running task %s on instance %s. Got: %v\n", task.Name(), si.instance.Name, err) + } + } + case <-ctx.Done(): + log.Printf("Processing tasks for instance %s has been canceled.\n", si.instance.Name) + return + } } - - return nil } -func (s *scheduler) AddTask(task Task) error { +func (s *scheduler) addTask(task Task) error { if _, found := s.tasks[task.Name()]; found { return fmt.Errorf("Task [%s] was already added", task.Name()) } @@ -79,148 +109,110 @@ func (s *scheduler) AddTask(task Task) error { return nil } -func (s *scheduler) RemoveTask(task Task) error { - if _, found := s.tasks[task.Name()]; !found { - return fmt.Errorf("Task [%s] doesn't exist", task.Name()) +func (s *scheduler) unscheduleSession(session *types.Session) { + ss, found := s.scheduledSessions[session.Id] + if !found { + return } - delete(s.tasks, task.Name()) - return nil + ss.cancel() + delete(s.scheduledSessions, ss.session.Id) + log.Printf("Unscheduled session %s\n", session.Id) +} +func (s *scheduler) scheduleSession(session *types.Session) { + if _, found := s.scheduledSessions[session.Id]; found { + log.Printf("Session %s is already scheduled. Ignoring.\n", session.Id) + return + } + ss := &scheduledSession{session: session} + s.scheduledSessions[session.Id] = ss + ctx, cancel := context.WithCancel(context.Background()) + ss.cancel = cancel + go s.processSession(ctx, ss) + log.Printf("Scheduled session %s\n", session.Id) +} +func (s *scheduler) unscheduleInstance(instance *types.Instance) { + si, found := s.scheduledInstances[instance.Name] + if !found { + return + } + si.cancel() + si.ticker.Stop() + delete(s.scheduledInstances, si.instance.Name) + log.Printf("Unscheduled instance %s\n", instance.Name) +} +func (s *scheduler) scheduleInstance(instance *types.Instance) { + if _, found := s.scheduledInstances[instance.Name]; found { + log.Printf("Instance %s is already scheduled. Ignoring.\n", instance.Name) + return + } + si := &scheduledInstance{instance: instance} + s.scheduledInstances[instance.Name] = si + ctx, cancel := context.WithCancel(context.Background()) + si.cancel = cancel + si.ticker = time.NewTicker(time.Second) + go s.processInstance(ctx, si) + log.Printf("Scheduled instance %s\n", instance.Name) } func (s *scheduler) Stop() { - for _, session := range s.scheduledSessions { - s.Unschedule(session.session) + for _, ss := range s.scheduledSessions { + s.unscheduleSession(ss.session) + } + for _, si := range s.scheduledInstances { + s.unscheduleInstance(si.instance) } s.started = false } -func (s *scheduler) Start() { - for _, session := range s.scheduledSessions { - ctx, cancel := context.WithCancel(context.Background()) - session.cancel = cancel - session.ticker = time.NewTicker(1 * time.Second) - go s.cron(ctx, session) +func (s *scheduler) Start() error { + sessions, err := s.storage.SessionGetAll() + if err != nil { + return err + } + for _, session := range sessions { + s.scheduleSession(session) + + instances, err := s.storage.InstanceFindBySessionId(session.Id) + if err != nil { + return err + } + + for _, instance := range instances { + s.scheduleInstance(instance) + } } s.event.On(event.SESSION_NEW, func(sessionId string, args ...interface{}) { + log.Printf("EVENT: Session New %s\n", sessionId) session, err := s.storage.SessionGet(sessionId) if err != nil { log.Printf("Session [%s] was not found in storage. Got %s\n", sessionId, err) return } - s.Schedule(session) + s.scheduleSession(session) }) s.event.On(event.SESSION_END, func(sessionId string, args ...interface{}) { + log.Printf("EVENT: Session End %s\n", sessionId) session := &types.Session{Id: sessionId} - err := s.Unschedule(session) + s.unscheduleSession(session) + }) + s.event.On(event.INSTANCE_NEW, func(sessionId string, args ...interface{}) { + instanceName := args[0].(string) + log.Printf("EVENT: Instance New %s\n", instanceName) + instance, err := s.storage.InstanceGet(instanceName) if err != nil { - log.Println(err) + log.Printf("Instance [%s] was not found in storage. Got %s\n", instanceName, err) return } + s.scheduleInstance(instance) + }) + s.event.On(event.INSTANCE_DELETE, func(sessionId string, args ...interface{}) { + instanceName := args[0].(string) + log.Printf("EVENT: Instance Delete %s\n", instanceName) + instance := &types.Instance{Name: instanceName} + s.unscheduleInstance(instance) }) s.started = true -} - -func (s *scheduler) register(session *types.Session) *scheduledSession { - ss := &scheduledSession{session: session, busy: 0} - s.scheduledSessions[session.Id] = ss - return ss -} - -func (s *scheduler) cron(ctx context.Context, session *scheduledSession) { - for { - select { - case <-session.ticker.C: - if atomic.CompareAndSwapInt32(&session.busy, 0, 1) { - if time.Now().After(session.session.ExpiresAt) { - // Session has expired. Need to close the session. - s.pwd.SessionClose(session.session) - return - } else { - s.processSession(ctx, session.session) - } - atomic.StoreInt32(&session.busy, 0) - } else { - log.Printf("Session [%s] is currently busy. Will try next time.\n", session.session.Id) - } - case <-ctx.Done(): - return - } - } -} - -func (s *scheduler) processSession(ctx context.Context, session *types.Session) { - updatedSession, err := s.storage.SessionGet(session.Id) - if err != nil { - if storage.NotFound(err) { - log.Printf("Session [%s] was not found in storage. Unscheduling.\n", session.Id) - s.Unschedule(session) - } else { - log.Printf("Cannot process session. Got %s\n", err) - } - return - } - - instances, err := s.storage.InstanceFindBySessionId(updatedSession.Id) - if err != nil { - log.Printf("Couldn't find instances for session [%s]. Got: %v\n", updatedSession.Id, err) - return - } - wg := sync.WaitGroup{} - wg.Add(len(instances)) - for _, ins := range instances { - go func(ins *types.Instance) { - s.processInstance(ctx, ins) - wg.Done() - }(ins) - } - wg.Wait() -} - -func (s *scheduler) processInstance(ctx context.Context, instance *types.Instance) { - wg := sync.WaitGroup{} - wg.Add(len(s.tasks)) - for _, task := range s.tasks { - go func(task Task) { - task.Run(ctx, instance) - wg.Done() - }(task) - } - wg.Wait() -} - -func (s *scheduler) Schedule(session *types.Session) error { - if !s.started { - return fmt.Errorf("Can only schedule sessions after the scheduler has been started.") - } - if _, found := s.scheduledSessions[session.Id]; found { - return fmt.Errorf("Session [%s] was already scheduled", session.Id) - } - scheduledSession := s.register(session) - ctx, cancel := context.WithCancel(context.Background()) - scheduledSession.cancel = cancel - scheduledSession.ticker = time.NewTicker(1 * time.Second) - go s.cron(ctx, scheduledSession) - - log.Printf("Scheduled session [%s]\n", session.Id) - - return nil -} - -func (s *scheduler) Unschedule(session *types.Session) error { - if !s.started { - return fmt.Errorf("Can only schedule sessions after the scheduler has been started.") - } - if _, found := s.scheduledSessions[session.Id]; !found { - return fmt.Errorf("Session [%s] in not scheduled", session.Id) - } - - scheduledSession := s.scheduledSessions[session.Id] - scheduledSession.cancel() - scheduledSession.ticker.Stop() - delete(s.scheduledSessions, session.Id) - - log.Printf("Unscheduled session [%s]\n", session.Id) return nil } diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go deleted file mode 100644 index 152aa76..0000000 --- a/scheduler/scheduler_test.go +++ /dev/null @@ -1,207 +0,0 @@ -package scheduler - -import ( - "context" - "io/ioutil" - "log" - "os" - "sync" - "testing" - "time" - - "github.com/play-with-docker/play-with-docker/event" - "github.com/play-with-docker/play-with-docker/pwd" - "github.com/play-with-docker/play-with-docker/pwd/types" - "github.com/play-with-docker/play-with-docker/storage" - "github.com/stretchr/testify/assert" -) - -type mockTask struct { - name string - run func(ctx context.Context, instance *types.Instance) error -} - -func (m *mockTask) Name() string { - return m.name -} -func (m *mockTask) Run(ctx context.Context, instance *types.Instance) error { - return m.run(ctx, instance) -} - -func mockStorage() storage.StorageApi { - tmpfile, err := ioutil.TempFile("", "pwd") - if err != nil { - log.Fatal(err) - } - tmpfile.Close() - os.Remove(tmpfile.Name()) - store, _ := storage.NewFileStorage(tmpfile.Name()) - return store -} - -func TestNew(t *testing.T) { - store := mockStorage() - - s := &types.Session{ - Id: "aaabbbccc", - ExpiresAt: time.Now().Add(time.Hour), - } - - i := &types.Instance{ - SessionId: s.Id, - Name: "node1", - IP: "10.0.0.1", - } - err := store.SessionPut(s) - assert.Nil(t, err) - - err = store.InstancePut(i) - assert.Nil(t, err) - - sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{}) - assert.Nil(t, err) - assert.Len(t, sch.scheduledSessions, 1) -} - -func TestAddTask(t *testing.T) { - store := mockStorage() - sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{}) - assert.Nil(t, err) - - task := &mockTask{name: "FooBar"} - err = sch.AddTask(task) - assert.Nil(t, err) - - err = sch.AddTask(task) - assert.NotNil(t, err) - - assert.Equal(t, map[string]Task{"FooBar": task}, sch.tasks) -} - -func TestRemoveTask(t *testing.T) { - store := mockStorage() - sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{}) - assert.Nil(t, err) - - task := &mockTask{name: "FooBar"} - err = sch.AddTask(task) - assert.Nil(t, err) - - err = sch.RemoveTask(task) - assert.Nil(t, err) - - err = sch.RemoveTask(task) - assert.NotNil(t, err) - - assert.Equal(t, map[string]Task{}, sch.tasks) -} - -func TestStart(t *testing.T) { - store := mockStorage() - - s := &types.Session{ - Id: "aaabbbccc", - ExpiresAt: time.Now().Add(time.Hour), - } - - i := &types.Instance{ - SessionId: s.Id, - Name: "node1", - IP: "10.0.0.1", - } - err := store.SessionPut(s) - assert.Nil(t, err) - - err = store.InstancePut(i) - assert.Nil(t, err) - - sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{}) - assert.Nil(t, err) - - wg := sync.WaitGroup{} - wg.Add(1) - ran := false - task := &mockTask{name: "FooBar", run: func(ctx context.Context, instance *types.Instance) error { - ran = true - wg.Done() - return nil - }} - err = sch.AddTask(task) - assert.Nil(t, err) - - sch.Start() - defer sch.Stop() - wg.Wait() - assert.True(t, ran) -} - -func TestScheduleFromEvent(t *testing.T) { - s := &types.Session{ - Id: "aaaabbbbcccc", - ExpiresAt: time.Now().Add(time.Hour), - } - lb := event.NewLocalBroker() - store := mockStorage() - store.SessionPut(s) - sch, err := NewScheduler(store, lb, &pwd.Mock{}) - assert.Nil(t, err) - - sch.Start() - defer sch.Stop() - - lb.Emit(event.SESSION_NEW, s.Id) - - time.Sleep(time.Second) - - assert.Len(t, sch.scheduledSessions, 1) -} - -func TestUnscheduleFromEvent(t *testing.T) { - s := &types.Session{ - Id: "aaaabbbbcccc", - ExpiresAt: time.Now().Add(time.Hour), - } - lb := event.NewLocalBroker() - store := mockStorage() - store.SessionPut(s) - sch, err := NewScheduler(store, lb, &pwd.Mock{}) - assert.Nil(t, err) - - sch.Start() - defer sch.Stop() - - lb.Emit(event.SESSION_END, s.Id) - - time.Sleep(time.Second) - - assert.Len(t, sch.scheduledSessions, 0) -} - -func TestCloseSession(t *testing.T) { - _e := event.NewLocalBroker() - _p := &pwd.Mock{} - _s := mockStorage() - - s := &types.Session{ - Id: "aaaabbbbcccc", - ExpiresAt: time.Now().Add(-1 * time.Hour), - } - _p.On("SessionClose", s).Return(nil) - - _s.SessionPut(s) - - sch, err := NewScheduler(_s, _e, _p) - assert.Nil(t, err) - - sch.Start() - defer sch.Stop() - - time.Sleep(2 * time.Second) - - _p.AssertExpectations(t) - - _e.Emit(event.SESSION_END, s.Id) - time.Sleep(time.Second) - - assert.Len(t, sch.scheduledSessions, 0) -}