From 0bc034239954e9da0fe4a5227a581e0c4baca2a0 Mon Sep 17 00:00:00 2001 From: "Jonathan Leibiusky @xetorthio" Date: Thu, 10 Aug 2017 17:18:34 -0300 Subject: [PATCH] Scheduler fixes --- scheduler/scheduler.go | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 94051b1..1f88f4b 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -108,12 +108,8 @@ func (s *scheduler) Start() { s.Schedule(session) }) s.event.On(event.SESSION_END, func(sessionId string, args ...interface{}) { - session, err := s.storage.SessionGet(sessionId) - if err != nil { - log.Printf("Session [%s] was not found in storage. Got %s\n", sessionId, err) - return - } - err = s.Unschedule(session) + session := &types.Session{Id: sessionId} + err := s.Unschedule(session) if err != nil { log.Println(err) return @@ -146,7 +142,18 @@ func (s *scheduler) cron(ctx context.Context, session *scheduledSession) { } func (s *scheduler) processSession(ctx context.Context, session *types.Session) { - for _, ins := range session.Instances { + updatedSession, err := s.storage.SessionGet(session.Id) + if err != nil { + if storage.NotFound(err) { + log.Printf("Session [%s] was not found in storage. Unscheduling.\n", session.Id) + s.Unschedule(session) + } else { + log.Printf("Cannot process session. Got %s\n", err) + } + return + } + + for _, ins := range updatedSession.Instances { go s.processInstance(ctx, ins) } } @@ -169,6 +176,9 @@ func (s *scheduler) Schedule(session *types.Session) error { scheduledSession.cancel = cancel scheduledSession.ticker = time.NewTicker(1 * time.Second) go s.cron(ctx, scheduledSession) + + log.Printf("Scheduled session [%s]\n", session.Id) + return nil } @@ -185,5 +195,7 @@ func (s *scheduler) Unschedule(session *types.Session) error { scheduledSession.ticker.Stop() delete(s.scheduledSessions, session.Id) + log.Printf("Unscheduled session [%s]\n", session.Id) + return nil }