Scheduler fixes
This commit is contained in:
@@ -108,12 +108,8 @@ func (s *scheduler) Start() {
|
|||||||
s.Schedule(session)
|
s.Schedule(session)
|
||||||
})
|
})
|
||||||
s.event.On(event.SESSION_END, func(sessionId string, args ...interface{}) {
|
s.event.On(event.SESSION_END, func(sessionId string, args ...interface{}) {
|
||||||
session, err := s.storage.SessionGet(sessionId)
|
session := &types.Session{Id: sessionId}
|
||||||
if err != nil {
|
err := s.Unschedule(session)
|
||||||
log.Printf("Session [%s] was not found in storage. Got %s\n", sessionId, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = s.Unschedule(session)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
return
|
return
|
||||||
@@ -146,7 +142,18 @@ func (s *scheduler) cron(ctx context.Context, session *scheduledSession) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *scheduler) processSession(ctx context.Context, session *types.Session) {
|
func (s *scheduler) processSession(ctx context.Context, session *types.Session) {
|
||||||
for _, ins := range session.Instances {
|
updatedSession, err := s.storage.SessionGet(session.Id)
|
||||||
|
if err != nil {
|
||||||
|
if storage.NotFound(err) {
|
||||||
|
log.Printf("Session [%s] was not found in storage. Unscheduling.\n", session.Id)
|
||||||
|
s.Unschedule(session)
|
||||||
|
} else {
|
||||||
|
log.Printf("Cannot process session. Got %s\n", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ins := range updatedSession.Instances {
|
||||||
go s.processInstance(ctx, ins)
|
go s.processInstance(ctx, ins)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -169,6 +176,9 @@ func (s *scheduler) Schedule(session *types.Session) error {
|
|||||||
scheduledSession.cancel = cancel
|
scheduledSession.cancel = cancel
|
||||||
scheduledSession.ticker = time.NewTicker(1 * time.Second)
|
scheduledSession.ticker = time.NewTicker(1 * time.Second)
|
||||||
go s.cron(ctx, scheduledSession)
|
go s.cron(ctx, scheduledSession)
|
||||||
|
|
||||||
|
log.Printf("Scheduled session [%s]\n", session.Id)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -185,5 +195,7 @@ func (s *scheduler) Unschedule(session *types.Session) error {
|
|||||||
scheduledSession.ticker.Stop()
|
scheduledSession.ticker.Stop()
|
||||||
delete(s.scheduledSessions, session.Id)
|
delete(s.scheduledSessions, session.Id)
|
||||||
|
|
||||||
|
log.Printf("Unscheduled session [%s]\n", session.Id)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user