diff --git a/api.go b/api.go index 772bf67..fc10bb3 100644 --- a/api.go +++ b/api.go @@ -52,7 +52,7 @@ func main() { log.Fatalf("Cannot parse duration %s. Got: %v", config.DefaultSessionDuration, err) } - playground := types.Playground{Domain: config.PlaygroundDomain, DefaultDinDInstanceImage: config.DefaultDinDImage, AllowWindowsInstances: config.NoWindows, DefaultSessionDuration: d, AvailableDinDInstanceImages: []string{config.DefaultDinDImage}} + playground := types.Playground{Domain: config.PlaygroundDomain, DefaultDinDInstanceImage: config.DefaultDinDImage, AllowWindowsInstances: config.NoWindows, DefaultSessionDuration: d, AvailableDinDInstanceImages: []string{config.DefaultDinDImage}, Tasks: []string{".*"}} if _, err := core.PlaygroundNew(playground); err != nil { log.Fatalf("Cannot create default playground. Got: %v", err) } diff --git a/pwd/types/playground.go b/pwd/types/playground.go index fb5fe67..582ed10 100644 --- a/pwd/types/playground.go +++ b/pwd/types/playground.go @@ -79,4 +79,5 @@ type Playground struct { AllowWindowsInstances bool `json:"allow_windows_instances" bson:"allow_windows_instances"` DefaultSessionDuration time.Duration `json:"default_session_duration" bson:"default_session_duration"` Extras PlaygroundExtras `json:"extras" bson:"extras"` + Tasks []string `json:"tasks" bson:"tasks"` } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 75031e8..91f8c11 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "log" + "regexp" + "sync" "time" "github.com/play-with-docker/play-with-docker/event" @@ -28,21 +30,26 @@ type scheduledSession struct { } type scheduledInstance struct { - instance *types.Instance - ticker *time.Ticker - cancel context.CancelFunc - fails int + instance *types.Instance + playgroundId string + ticker *time.Ticker + cancel context.CancelFunc + fails int } type scheduler struct { scheduledSessions map[string]*scheduledSession scheduledInstances map[string]*scheduledInstance tasks map[string]Task + playgrounds map[string]*types.Playground + playgroundTasks map[string][]Task started bool + ticker *time.Ticker storage storage.StorageApi event event.EventApi pwd pwd.PWDApi + mx sync.Mutex } func NewScheduler(tasks []Task, s storage.StorageApi, e event.EventApi, p pwd.PWDApi) (*scheduler, error) { @@ -51,6 +58,8 @@ func NewScheduler(tasks []Task, s storage.StorageApi, e event.EventApi, p pwd.PW sch.tasks = make(map[string]Task) sch.scheduledSessions = make(map[string]*scheduledSession) sch.scheduledInstances = make(map[string]*scheduledInstance) + sch.playgrounds = make(map[string]*types.Playground) + sch.playgroundTasks = make(map[string][]Task) for _, task := range tasks { if err := sch.addTask(task); err != nil { @@ -61,6 +70,61 @@ func NewScheduler(tasks []Task, s storage.StorageApi, e event.EventApi, p pwd.PW return sch, nil } +func (s *scheduler) updatePlaygrounds() { + s.mx.Lock() + defer s.mx.Unlock() + + log.Printf("Updating playgrounds configuration\n") + for playgroundId, _ := range s.playgrounds { + playground, err := s.storage.PlaygroundGet(playgroundId) + if err != nil { + log.Printf("Could not find playground %s\n", playgroundId) + continue + } + s.playgrounds[playgroundId] = playground + matchedTasks := s.getMatchedTasks(playground) + s.playgroundTasks[playground.Id] = matchedTasks + } +} + +func (s *scheduler) schedulePlaygroundsUpdate() { + s.updatePlaygrounds() + s.ticker = time.NewTicker(time.Minute * 5) + go func() { + for range s.ticker.C { + s.updatePlaygrounds() + } + }() +} + +func (s *scheduler) getMatchedTasks(playground *types.Playground) []Task { + matchedTasks := []Task{} + for _, expr := range playground.Tasks { + for _, task := range s.tasks { + if expr == task.Name() { + matchedTasks = append(matchedTasks, task) + continue + } + matched, err := regexp.MatchString(expr, task.Name()) + if err != nil { + continue + } + if matched { + matchedTasks = append(matchedTasks, task) + continue + } + } + } + return matchedTasks +} + +func (s *scheduler) getTasks(playgroundId string) []Task { + s.mx.Lock() + defer s.mx.Unlock() + + return s.playgroundTasks[playgroundId] +} + func (s *scheduler) processSession(ctx context.Context, ss *scheduledSession) { defer s.unscheduleSession(ss.session) select { @@ -93,7 +157,7 @@ func (s *scheduler) processInstance(ctx context.Context, si *scheduledInstance) log.Printf("Error retrieving instance %s from storage. Got: %v\n", si.instance.Name, err) continue } - for _, task := range s.tasks { + for _, task := range s.getTasks(si.playgroundId) { err := task.Run(ctx, si.instance) if err != nil { log.Printf("Error running task %s on instance %s. Got: %v\n", task.Name(), si.instance.Name, err) @@ -145,12 +209,12 @@ func (s *scheduler) unscheduleInstance(instance *types.Instance) { delete(s.scheduledInstances, si.instance.Name) log.Printf("Unscheduled instance %s\n", instance.Name) } -func (s *scheduler) scheduleInstance(instance *types.Instance) { +func (s *scheduler) scheduleInstance(instance *types.Instance, playgroundId string) { if _, found := s.scheduledInstances[instance.Name]; found { log.Printf("Instance %s is already scheduled. Ignoring.\n", instance.Name) return } - si := &scheduledInstance{instance: instance} + si := &scheduledInstance{instance: instance, playgroundId: playgroundId} s.scheduledInstances[instance.Name] = si ctx, cancel := context.WithCancel(context.Background()) si.cancel = cancel @@ -160,6 +224,7 @@ func (s *scheduler) scheduleInstance(instance *types.Instance) { } func (s *scheduler) Stop() { + s.ticker.Stop() for _, ss := range s.scheduledSessions { s.unscheduleSession(ss.session) } @@ -176,6 +241,13 @@ func (s *scheduler) Start() error { } for _, session := range sessions { s.scheduleSession(session) + if _, found := s.playgrounds[session.PlaygroundId]; !found { + playground, err := s.storage.PlaygroundGet(session.PlaygroundId) + if err != nil { + return err + } + s.playgrounds[playground.Id] = playground + } instances, err := s.storage.InstanceFindBySessionId(session.Id) if err != nil { @@ -183,16 +255,31 @@ func (s *scheduler) Start() error { } for _, instance := range instances { - s.scheduleInstance(instance) + s.scheduleInstance(instance, session.PlaygroundId) } } + + // Refresh playground conf every 5 minutes + s.schedulePlaygroundsUpdate() + s.event.On(event.SESSION_NEW, func(sessionId string, args ...interface{}) { + s.mx.Lock() + defer s.mx.Unlock() + log.Printf("EVENT: Session New %s\n", sessionId) session, err := s.storage.SessionGet(sessionId) if err != nil { log.Printf("Session [%s] was not found in storage. Got %s\n", sessionId, err) return } + if _, found := s.playgrounds[session.PlaygroundId]; !found { + playground, err := s.storage.PlaygroundGet(session.PlaygroundId) + if err != nil { + log.Printf("Could not find playground %s\n") + return + } + s.playgrounds[playground.Id] = playground + } s.scheduleSession(session) }) s.event.On(event.SESSION_END, func(sessionId string, args ...interface{}) { @@ -208,7 +295,12 @@ func (s *scheduler) Start() error { log.Printf("Instance [%s] was not found in storage. Got %s\n", instanceName, err) return } - s.scheduleInstance(instance) + session, err := s.storage.SessionGet(instance.SessionId) + if err != nil { + log.Printf("Session [%s] was not found in storage. Got %s\n", instance.SessionId, err) + return + } + s.scheduleInstance(instance, session.PlaygroundId) }) s.event.On(event.INSTANCE_DELETE, func(sessionId string, args ...interface{}) { instanceName := args[0].(string) diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go new file mode 100644 index 0000000..a5fff51 --- /dev/null +++ b/scheduler/scheduler_test.go @@ -0,0 +1,58 @@ +package scheduler + +import ( + "context" + "testing" + + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/pwd" + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/play-with-docker/play-with-docker/storage" + "github.com/stretchr/testify/assert" +) + +type fakeTask struct { + name string +} + +func (f fakeTask) Name() string { + return f.name +} +func (f fakeTask) Run(ctx context.Context, instance *types.Instance) error { + return nil +} + +func TestScheduler_getMatchedTasks(t *testing.T) { + tasks := []Task{ + fakeTask{name: "docker_task1"}, + fakeTask{name: "docker_task2"}, + fakeTask{name: "k8s_task1"}, + fakeTask{name: "k8s_task2"}, + } + + _s := &storage.Mock{} + _e := &event.Mock{} + _p := &pwd.Mock{} + + s, err := NewScheduler(tasks, _s, _e, _p) + assert.Nil(t, err) + + // No matches + matched := s.getMatchedTasks(&types.Playground{Tasks: []string{}}) + assert.Empty(t, matched) + + // Match everything + matched = s.getMatchedTasks(&types.Playground{Tasks: []string{".*"}}) + assert.Subset(t, tasks, matched) + assert.Len(t, matched, len(tasks)) + + // Match some + matched = s.getMatchedTasks(&types.Playground{Tasks: []string{"docker_.*"}}) + assert.Subset(t, []Task{fakeTask{name: "docker_task1"}, fakeTask{name: "docker_task2"}}, matched) + assert.Len(t, matched, 2) + + // Match exactly + matched = s.getMatchedTasks(&types.Playground{Tasks: []string{"docker_task1", "docker_task3"}}) + assert.Subset(t, []Task{fakeTask{name: "docker_task1"}}, matched) + assert.Len(t, matched, 1) +} diff --git a/storage/file_test.go b/storage/file_test.go index fa77499..de987e0 100644 --- a/storage/file_test.go +++ b/storage/file_test.go @@ -707,5 +707,6 @@ func TestPlaygroundGetAll(t *testing.T) { found, err := storage.PlaygroundGetAll() assert.Nil(t, err) - assert.Equal(t, []*types.Playground{p1, p2}, found) + assert.Subset(t, []*types.Playground{p1, p2}, found) + assert.Len(t, found, 2) }