Tests are working again
This commit is contained in:
187
scheduler/scheduler.go
Normal file
187
scheduler/scheduler.go
Normal file
@@ -0,0 +1,187 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/play-with-docker/play-with-docker/pwd"
|
||||
"github.com/play-with-docker/play-with-docker/pwd/types"
|
||||
"github.com/play-with-docker/play-with-docker/storage"
|
||||
)
|
||||
|
||||
type Task interface {
|
||||
Name() string
|
||||
Run(ctx context.Context, instance *types.Instance) error
|
||||
}
|
||||
|
||||
type SchedulerApi interface {
|
||||
Schedule(session *types.Session) error
|
||||
Unschedule(session *types.Session) error
|
||||
Start()
|
||||
Stop()
|
||||
AddTask(task Task) error
|
||||
RemoveTask(task Task) error
|
||||
}
|
||||
|
||||
type scheduledSession struct {
|
||||
session *types.Session
|
||||
cancel context.CancelFunc
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
type scheduler struct {
|
||||
scheduledSessions map[string]*scheduledSession
|
||||
storage storage.StorageApi
|
||||
event event.EventApi
|
||||
pwd pwd.PWDApi
|
||||
tasks map[string]Task
|
||||
started bool
|
||||
}
|
||||
|
||||
func NewScheduler(s storage.StorageApi, e event.EventApi, p pwd.PWDApi) (*scheduler, error) {
|
||||
sch := &scheduler{storage: s, event: e, pwd: p}
|
||||
|
||||
sch.tasks = make(map[string]Task)
|
||||
sch.scheduledSessions = make(map[string]*scheduledSession)
|
||||
|
||||
err := sch.loadFromStorage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sch, nil
|
||||
}
|
||||
|
||||
func (s *scheduler) loadFromStorage() error {
|
||||
sessions, err := s.storage.SessionGetAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, session := range sessions {
|
||||
s.register(session)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scheduler) AddTask(task Task) error {
|
||||
if _, found := s.tasks[task.Name()]; found {
|
||||
return fmt.Errorf("Task [%s] was already added", task.Name())
|
||||
}
|
||||
s.tasks[task.Name()] = task
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scheduler) RemoveTask(task Task) error {
|
||||
if _, found := s.tasks[task.Name()]; !found {
|
||||
return fmt.Errorf("Task [%s] doesn't exist", task.Name())
|
||||
}
|
||||
delete(s.tasks, task.Name())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scheduler) Stop() {
|
||||
for _, session := range s.scheduledSessions {
|
||||
s.Unschedule(session.session)
|
||||
}
|
||||
s.started = false
|
||||
}
|
||||
|
||||
func (s *scheduler) Start() {
|
||||
for _, session := range s.scheduledSessions {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
session.cancel = cancel
|
||||
session.ticker = time.NewTicker(1 * time.Second)
|
||||
go s.cron(ctx, session)
|
||||
}
|
||||
s.event.On(event.SESSION_NEW, func(sessionId string, args ...interface{}) {
|
||||
session, err := s.storage.SessionGet(sessionId)
|
||||
if err != nil {
|
||||
log.Printf("Session [%s] was not found in storage. Got %s\n", sessionId, err)
|
||||
return
|
||||
}
|
||||
s.Schedule(session)
|
||||
})
|
||||
s.event.On(event.SESSION_END, func(sessionId string, args ...interface{}) {
|
||||
session, err := s.storage.SessionGet(sessionId)
|
||||
if err != nil {
|
||||
log.Printf("Session [%s] was not found in storage. Got %s\n", sessionId, err)
|
||||
return
|
||||
}
|
||||
err = s.Unschedule(session)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
})
|
||||
s.started = true
|
||||
}
|
||||
|
||||
func (s *scheduler) register(session *types.Session) *scheduledSession {
|
||||
s.scheduledSessions[session.Id] = &scheduledSession{session: session}
|
||||
return s.scheduledSessions[session.Id]
|
||||
}
|
||||
|
||||
func (s *scheduler) cron(ctx context.Context, session *scheduledSession) {
|
||||
for {
|
||||
select {
|
||||
case <-session.ticker.C:
|
||||
if time.Now().After(session.session.ExpiresAt) {
|
||||
// Session has expired. Need to close the session.
|
||||
s.pwd.SessionClose(session.session)
|
||||
return
|
||||
} else {
|
||||
s.processSession(ctx, session.session)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *scheduler) processSession(ctx context.Context, session *types.Session) {
|
||||
for _, ins := range session.Instances {
|
||||
go s.processInstance(ctx, ins)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *scheduler) processInstance(ctx context.Context, instance *types.Instance) {
|
||||
for _, task := range s.tasks {
|
||||
go task.Run(ctx, instance)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *scheduler) Schedule(session *types.Session) error {
|
||||
if !s.started {
|
||||
return fmt.Errorf("Can only schedule sessions after the scheduler has been started.")
|
||||
}
|
||||
if _, found := s.scheduledSessions[session.Id]; found {
|
||||
return fmt.Errorf("Session [%s] was already scheduled", session.Id)
|
||||
}
|
||||
scheduledSession := s.register(session)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
scheduledSession.cancel = cancel
|
||||
go s.cron(ctx, scheduledSession)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scheduler) Unschedule(session *types.Session) error {
|
||||
if !s.started {
|
||||
return fmt.Errorf("Can only schedule sessions after the scheduler has been started.")
|
||||
}
|
||||
if _, found := s.scheduledSessions[session.Id]; !found {
|
||||
return fmt.Errorf("Session [%s] in not scheduled", session.Id)
|
||||
}
|
||||
|
||||
scheduledSession := s.scheduledSessions[session.Id]
|
||||
scheduledSession.cancel()
|
||||
scheduledSession.ticker.Stop()
|
||||
delete(s.scheduledSessions, session.Id)
|
||||
|
||||
return nil
|
||||
}
|
||||
201
scheduler/scheduler_test.go
Normal file
201
scheduler/scheduler_test.go
Normal file
@@ -0,0 +1,201 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/play-with-docker/play-with-docker/pwd"
|
||||
"github.com/play-with-docker/play-with-docker/pwd/types"
|
||||
"github.com/play-with-docker/play-with-docker/storage"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type mockTask struct {
|
||||
name string
|
||||
run func(ctx context.Context, instance *types.Instance) error
|
||||
}
|
||||
|
||||
func (m *mockTask) Name() string {
|
||||
return m.name
|
||||
}
|
||||
func (m *mockTask) Run(ctx context.Context, instance *types.Instance) error {
|
||||
return m.run(ctx, instance)
|
||||
}
|
||||
|
||||
func mockStorage() storage.StorageApi {
|
||||
tmpfile, err := ioutil.TempFile("", "pwd")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
tmpfile.Close()
|
||||
os.Remove(tmpfile.Name())
|
||||
store, _ := storage.NewFileStorage(tmpfile.Name())
|
||||
return store
|
||||
}
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
store := mockStorage()
|
||||
|
||||
s := &types.Session{
|
||||
Id: "aaabbbccc",
|
||||
ExpiresAt: time.Now().Add(time.Hour),
|
||||
Instances: map[string]*types.Instance{
|
||||
"node1": &types.Instance{
|
||||
Name: "node1",
|
||||
IP: "10.0.0.1",
|
||||
},
|
||||
},
|
||||
}
|
||||
err := store.SessionPut(s)
|
||||
assert.Nil(t, err)
|
||||
|
||||
sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{})
|
||||
assert.Nil(t, err)
|
||||
assert.Len(t, sch.scheduledSessions, 1)
|
||||
}
|
||||
|
||||
func TestAddTask(t *testing.T) {
|
||||
store := mockStorage()
|
||||
sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{})
|
||||
assert.Nil(t, err)
|
||||
|
||||
task := &mockTask{name: "FooBar"}
|
||||
err = sch.AddTask(task)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = sch.AddTask(task)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
assert.Equal(t, map[string]Task{"FooBar": task}, sch.tasks)
|
||||
}
|
||||
|
||||
func TestRemoveTask(t *testing.T) {
|
||||
store := mockStorage()
|
||||
sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{})
|
||||
assert.Nil(t, err)
|
||||
|
||||
task := &mockTask{name: "FooBar"}
|
||||
err = sch.AddTask(task)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = sch.RemoveTask(task)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = sch.RemoveTask(task)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
assert.Equal(t, map[string]Task{}, sch.tasks)
|
||||
}
|
||||
|
||||
func TestStart(t *testing.T) {
|
||||
store := mockStorage()
|
||||
|
||||
s := &types.Session{
|
||||
Id: "aaabbbccc",
|
||||
ExpiresAt: time.Now().Add(time.Hour),
|
||||
Instances: map[string]*types.Instance{
|
||||
"node1": &types.Instance{
|
||||
Name: "node1",
|
||||
IP: "10.0.0.1",
|
||||
},
|
||||
},
|
||||
}
|
||||
err := store.SessionPut(s)
|
||||
assert.Nil(t, err)
|
||||
|
||||
sch, err := NewScheduler(store, event.NewLocalBroker(), &pwd.Mock{})
|
||||
assert.Nil(t, err)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
ran := false
|
||||
task := &mockTask{name: "FooBar", run: func(ctx context.Context, instance *types.Instance) error {
|
||||
ran = true
|
||||
wg.Done()
|
||||
return nil
|
||||
}}
|
||||
err = sch.AddTask(task)
|
||||
assert.Nil(t, err)
|
||||
|
||||
sch.Start()
|
||||
defer sch.Stop()
|
||||
wg.Wait()
|
||||
assert.True(t, ran)
|
||||
}
|
||||
|
||||
func TestScheduleFromEvent(t *testing.T) {
|
||||
s := &types.Session{
|
||||
Id: "aaaabbbbcccc",
|
||||
ExpiresAt: time.Now().Add(time.Hour),
|
||||
}
|
||||
lb := event.NewLocalBroker()
|
||||
store := mockStorage()
|
||||
store.SessionPut(s)
|
||||
sch, err := NewScheduler(store, lb, &pwd.Mock{})
|
||||
assert.Nil(t, err)
|
||||
|
||||
sch.Start()
|
||||
defer sch.Stop()
|
||||
|
||||
lb.Emit(event.SESSION_NEW, s.Id)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
assert.Len(t, sch.scheduledSessions, 1)
|
||||
}
|
||||
|
||||
func TestUnscheduleFromEvent(t *testing.T) {
|
||||
s := &types.Session{
|
||||
Id: "aaaabbbbcccc",
|
||||
ExpiresAt: time.Now().Add(time.Hour),
|
||||
}
|
||||
lb := event.NewLocalBroker()
|
||||
store := mockStorage()
|
||||
store.SessionPut(s)
|
||||
sch, err := NewScheduler(store, lb, &pwd.Mock{})
|
||||
assert.Nil(t, err)
|
||||
|
||||
sch.Start()
|
||||
defer sch.Stop()
|
||||
|
||||
lb.Emit(event.SESSION_END, s.Id)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
assert.Len(t, sch.scheduledSessions, 0)
|
||||
}
|
||||
|
||||
func TestCloseSession(t *testing.T) {
|
||||
_e := event.NewLocalBroker()
|
||||
_p := &pwd.Mock{}
|
||||
_s := mockStorage()
|
||||
|
||||
s := &types.Session{
|
||||
Id: "aaaabbbbcccc",
|
||||
ExpiresAt: time.Now().Add(-1 * time.Hour),
|
||||
}
|
||||
_p.On("SessionClose", s).Return(nil)
|
||||
|
||||
_s.SessionPut(s)
|
||||
|
||||
sch, err := NewScheduler(_s, _e, _p)
|
||||
assert.Nil(t, err)
|
||||
|
||||
sch.Start()
|
||||
defer sch.Stop()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
_p.AssertExpectations(t)
|
||||
|
||||
_e.Emit(event.SESSION_END, s.Id)
|
||||
time.Sleep(time.Second)
|
||||
|
||||
assert.Len(t, sch.scheduledSessions, 0)
|
||||
}
|
||||
55
scheduler/task/check_ports.go
Normal file
55
scheduler/task/check_ports.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/play-with-docker/play-with-docker/docker"
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/play-with-docker/play-with-docker/pwd/types"
|
||||
)
|
||||
|
||||
type DockerPorts struct {
|
||||
Instance string `json:"instance"`
|
||||
Ports []int `json:"ports"`
|
||||
}
|
||||
|
||||
type checkPorts struct {
|
||||
event event.EventApi
|
||||
factory docker.FactoryApi
|
||||
}
|
||||
|
||||
var CheckPortsEvent event.EventType
|
||||
|
||||
func init() {
|
||||
CheckPortsEvent = event.NewEventType("instance docker ports")
|
||||
}
|
||||
|
||||
func (t *checkPorts) Name() string {
|
||||
return "CheckPorts"
|
||||
}
|
||||
|
||||
func (t *checkPorts) Run(ctx context.Context, instance *types.Instance) error {
|
||||
dockerClient, err := t.factory.GetForInstance(instance.SessionId, instance.Name)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
|
||||
ps, err := dockerClient.GetPorts()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
ports := make([]int, len(ps))
|
||||
for i, port := range ps {
|
||||
ports[i] = int(port)
|
||||
}
|
||||
|
||||
t.event.Emit(CheckPortsEvent, instance.SessionId, DockerPorts{Instance: instance.Name, Ports: ports})
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewCheckPorts(e event.EventApi, f docker.FactoryApi) *checkPorts {
|
||||
return &checkPorts{event: e, factory: f}
|
||||
}
|
||||
48
scheduler/task/check_ports_test.go
Normal file
48
scheduler/task/check_ports_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/play-with-docker/play-with-docker/docker"
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/play-with-docker/play-with-docker/pwd/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCheckPorts_Name(t *testing.T) {
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
task := NewCheckPorts(e, f)
|
||||
|
||||
assert.Equal(t, "CheckPorts", task.Name())
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestCheckPorts_Run(t *testing.T) {
|
||||
d := &docker.Mock{}
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
d.On("GetPorts").Return([]uint16{8080, 9090}, nil)
|
||||
f.On("GetForInstance", "aaaabbbbcccc", "aaaabbbb_node1").Return(d, nil)
|
||||
e.M.On("Emit", CheckPortsEvent, "aaaabbbbcccc", []interface{}{DockerPorts{Instance: "aaaabbbb_node1", Ports: []int{8080, 9090}}}).Return()
|
||||
|
||||
i := &types.Instance{
|
||||
IP: "10.0.0.1",
|
||||
Name: "aaaabbbb_node1",
|
||||
SessionId: "aaaabbbbcccc",
|
||||
}
|
||||
|
||||
task := NewCheckPorts(e, f)
|
||||
ctx := context.Background()
|
||||
|
||||
err := task.Run(ctx, i)
|
||||
|
||||
assert.Nil(t, err)
|
||||
d.AssertExpectations(t)
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
72
scheduler/task/check_swarm_ports.go
Normal file
72
scheduler/task/check_swarm_ports.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/play-with-docker/play-with-docker/docker"
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/play-with-docker/play-with-docker/pwd/types"
|
||||
)
|
||||
|
||||
type DockerSwarmPorts struct {
|
||||
Manager string `json:"manager"`
|
||||
Instances []string `json:"instances"`
|
||||
Ports []int `json:"ports"`
|
||||
}
|
||||
|
||||
type checkSwarmPorts struct {
|
||||
event event.EventApi
|
||||
factory docker.FactoryApi
|
||||
}
|
||||
|
||||
var CheckSwarmPortsEvent event.EventType
|
||||
|
||||
func init() {
|
||||
CheckSwarmPortsEvent = event.NewEventType("instance docker swarm ports")
|
||||
}
|
||||
|
||||
func (t *checkSwarmPorts) Name() string {
|
||||
return "CheckSwarmPorts"
|
||||
}
|
||||
|
||||
func (t *checkSwarmPorts) Run(ctx context.Context, instance *types.Instance) error {
|
||||
dockerClient, err := t.factory.GetForInstance(instance.SessionId, instance.Name)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
|
||||
status, err := getDockerSwarmStatus(ctx, dockerClient)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
|
||||
if !status.IsManager {
|
||||
return nil
|
||||
}
|
||||
|
||||
hosts, ps, err := dockerClient.GetSwarmPorts()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
instances := make([]string, len(hosts))
|
||||
sessionPrefix := instance.SessionId[:8]
|
||||
for i, host := range hosts {
|
||||
instances[i] = fmt.Sprintf("%s_%s", sessionPrefix, host)
|
||||
}
|
||||
ports := make([]int, len(ps))
|
||||
for i, port := range ps {
|
||||
ports[i] = int(port)
|
||||
}
|
||||
|
||||
t.event.Emit(CheckSwarmPortsEvent, instance.SessionId, DockerSwarmPorts{Manager: instance.Name, Instances: instances, Ports: ports})
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewCheckSwarmPorts(e event.EventApi, f docker.FactoryApi) *checkSwarmPorts {
|
||||
return &checkSwarmPorts{event: e, factory: f}
|
||||
}
|
||||
57
scheduler/task/check_swarm_ports_test.go
Normal file
57
scheduler/task/check_swarm_ports_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
dockerTypes "github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
"github.com/play-with-docker/play-with-docker/docker"
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/play-with-docker/play-with-docker/pwd/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCheckSwarmPorts_Name(t *testing.T) {
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
task := NewCheckSwarmPorts(e, f)
|
||||
|
||||
assert.Equal(t, "CheckSwarmPorts", task.Name())
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestCheckSwarmPorts_RunWhenManager(t *testing.T) {
|
||||
d := &docker.Mock{}
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
i := &types.Instance{
|
||||
IP: "10.0.0.1",
|
||||
Name: "aaaabbbb_node1",
|
||||
SessionId: "aaaabbbbcccc",
|
||||
}
|
||||
info := dockerTypes.Info{
|
||||
Swarm: swarm.Info{
|
||||
LocalNodeState: swarm.LocalNodeStateActive,
|
||||
ControlAvailable: true,
|
||||
},
|
||||
}
|
||||
|
||||
f.On("GetForInstance", "aaaabbbbcccc", "aaaabbbb_node1").Return(d, nil)
|
||||
d.On("GetDaemonInfo").Return(info, nil)
|
||||
d.On("GetSwarmPorts").Return([]string{"node1", "node2"}, []uint16{8080, 9090}, nil)
|
||||
e.M.On("Emit", CheckSwarmPortsEvent, "aaaabbbbcccc", []interface{}{DockerSwarmPorts{Manager: i.Name, Instances: []string{i.Name, "aaaabbbb_node2"}, Ports: []int{8080, 9090}}}).Return()
|
||||
|
||||
task := NewCheckSwarmPorts(e, f)
|
||||
ctx := context.Background()
|
||||
|
||||
err := task.Run(ctx, i)
|
||||
|
||||
assert.Nil(t, err)
|
||||
d.AssertExpectations(t)
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
69
scheduler/task/check_swarm_status.go
Normal file
69
scheduler/task/check_swarm_status.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
"github.com/play-with-docker/play-with-docker/docker"
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/play-with-docker/play-with-docker/pwd/types"
|
||||
)
|
||||
|
||||
type DockerSwarmStatus struct {
|
||||
IsManager bool `json:"is_manager"`
|
||||
IsWorker bool `json:"is_worker"`
|
||||
Instance string `json:"instance"`
|
||||
}
|
||||
|
||||
type checkSwarmStatus struct {
|
||||
event event.EventApi
|
||||
factory docker.FactoryApi
|
||||
}
|
||||
|
||||
var CheckSwarmStatusEvent event.EventType
|
||||
|
||||
func init() {
|
||||
CheckSwarmStatusEvent = event.NewEventType("instance docker swarm status")
|
||||
}
|
||||
|
||||
func (t *checkSwarmStatus) Name() string {
|
||||
return "CheckSwarmStatus"
|
||||
}
|
||||
|
||||
func (t *checkSwarmStatus) Run(ctx context.Context, instance *types.Instance) error {
|
||||
dockerClient, err := t.factory.GetForInstance(instance.SessionId, instance.Name)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
|
||||
status, err := getDockerSwarmStatus(ctx, dockerClient)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
status.Instance = instance.Name
|
||||
|
||||
t.event.Emit(CheckSwarmStatusEvent, instance.SessionId, status)
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewCheckSwarmStatus(e event.EventApi, f docker.FactoryApi) *checkSwarmStatus {
|
||||
return &checkSwarmStatus{event: e, factory: f}
|
||||
}
|
||||
|
||||
func getDockerSwarmStatus(ctx context.Context, client docker.DockerApi) (DockerSwarmStatus, error) {
|
||||
status := DockerSwarmStatus{}
|
||||
info, err := client.GetDaemonInfo()
|
||||
if err != nil {
|
||||
return status, err
|
||||
}
|
||||
|
||||
if info.Swarm.LocalNodeState != swarm.LocalNodeStateInactive && info.Swarm.LocalNodeState != swarm.LocalNodeStateLocked {
|
||||
status.IsManager = info.Swarm.ControlAvailable
|
||||
status.IsWorker = !info.Swarm.ControlAvailable
|
||||
}
|
||||
|
||||
return status, nil
|
||||
}
|
||||
150
scheduler/task/check_swarm_status_test.go
Normal file
150
scheduler/task/check_swarm_status_test.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
dockerTypes "github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
"github.com/play-with-docker/play-with-docker/docker"
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/play-with-docker/play-with-docker/pwd/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCheckSwarmStatus_Name(t *testing.T) {
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
task := NewCheckSwarmStatus(e, f)
|
||||
|
||||
assert.Equal(t, "CheckSwarmStatus", task.Name())
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestCheckSwarmStatus_RunWhenInactive(t *testing.T) {
|
||||
d := &docker.Mock{}
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
i := &types.Instance{
|
||||
IP: "10.0.0.1",
|
||||
Name: "node1",
|
||||
SessionId: "aaabbbccc",
|
||||
}
|
||||
infoInactive := dockerTypes.Info{
|
||||
Swarm: swarm.Info{
|
||||
LocalNodeState: swarm.LocalNodeStateInactive,
|
||||
},
|
||||
}
|
||||
|
||||
f.On("GetForInstance", "aaabbbccc", "node1").Return(d, nil)
|
||||
d.On("GetDaemonInfo").Return(infoInactive, nil)
|
||||
e.M.On("Emit", CheckSwarmStatusEvent, "aaabbbccc", []interface{}{DockerSwarmStatus{IsManager: false, IsWorker: false, Instance: "node1"}}).Return()
|
||||
|
||||
task := NewCheckSwarmStatus(e, f)
|
||||
ctx := context.Background()
|
||||
|
||||
err := task.Run(ctx, i)
|
||||
|
||||
assert.Nil(t, err)
|
||||
d.AssertExpectations(t)
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestCheckSwarmStatus_RunWhenLocked(t *testing.T) {
|
||||
d := &docker.Mock{}
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
i := &types.Instance{
|
||||
IP: "10.0.0.1",
|
||||
Name: "node1",
|
||||
SessionId: "aaabbbccc",
|
||||
}
|
||||
infoLocked := dockerTypes.Info{
|
||||
Swarm: swarm.Info{
|
||||
LocalNodeState: swarm.LocalNodeStateLocked,
|
||||
},
|
||||
}
|
||||
|
||||
f.On("GetForInstance", "aaabbbccc", "node1").Return(d, nil)
|
||||
d.On("GetDaemonInfo").Return(infoLocked, nil)
|
||||
e.M.On("Emit", CheckSwarmStatusEvent, "aaabbbccc", []interface{}{DockerSwarmStatus{IsManager: false, IsWorker: false, Instance: "node1"}}).Return()
|
||||
|
||||
task := NewCheckSwarmStatus(e, f)
|
||||
ctx := context.Background()
|
||||
|
||||
err := task.Run(ctx, i)
|
||||
|
||||
assert.Nil(t, err)
|
||||
d.AssertExpectations(t)
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestCheckSwarmStatus_RunWhenManager(t *testing.T) {
|
||||
d := &docker.Mock{}
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
i := &types.Instance{
|
||||
IP: "10.0.0.1",
|
||||
Name: "node1",
|
||||
SessionId: "aaabbbccc",
|
||||
}
|
||||
infoLocked := dockerTypes.Info{
|
||||
Swarm: swarm.Info{
|
||||
LocalNodeState: swarm.LocalNodeStateActive,
|
||||
ControlAvailable: true,
|
||||
},
|
||||
}
|
||||
|
||||
f.On("GetForInstance", "aaabbbccc", "node1").Return(d, nil)
|
||||
d.On("GetDaemonInfo").Return(infoLocked, nil)
|
||||
e.M.On("Emit", CheckSwarmStatusEvent, "aaabbbccc", []interface{}{DockerSwarmStatus{IsManager: true, IsWorker: false, Instance: "node1"}}).Return()
|
||||
|
||||
task := NewCheckSwarmStatus(e, f)
|
||||
ctx := context.Background()
|
||||
|
||||
err := task.Run(ctx, i)
|
||||
|
||||
assert.Nil(t, err)
|
||||
d.AssertExpectations(t)
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestCheckSwarmStatus_RunWhenWorker(t *testing.T) {
|
||||
d := &docker.Mock{}
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
i := &types.Instance{
|
||||
IP: "10.0.0.1",
|
||||
Name: "node1",
|
||||
SessionId: "aaabbbccc",
|
||||
}
|
||||
infoLocked := dockerTypes.Info{
|
||||
Swarm: swarm.Info{
|
||||
LocalNodeState: swarm.LocalNodeStateActive,
|
||||
ControlAvailable: false,
|
||||
},
|
||||
}
|
||||
|
||||
f.On("GetForInstance", "aaabbbccc", "node1").Return(d, nil)
|
||||
d.On("GetDaemonInfo").Return(infoLocked, nil)
|
||||
e.M.On("Emit", CheckSwarmStatusEvent, "aaabbbccc", []interface{}{DockerSwarmStatus{IsManager: false, IsWorker: true, Instance: "node1"}}).Return()
|
||||
|
||||
task := NewCheckSwarmStatus(e, f)
|
||||
ctx := context.Background()
|
||||
|
||||
err := task.Run(ctx, i)
|
||||
|
||||
assert.Nil(t, err)
|
||||
d.AssertExpectations(t)
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
93
scheduler/task/collect_stats.go
Normal file
93
scheduler/task/collect_stats.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
dockerTypes "github.com/docker/docker/api/types"
|
||||
units "github.com/docker/go-units"
|
||||
"github.com/play-with-docker/play-with-docker/docker"
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/play-with-docker/play-with-docker/pwd/types"
|
||||
)
|
||||
|
||||
type InstanceStats struct {
|
||||
Instance string `json:"instance"`
|
||||
Mem string `json:"mem"`
|
||||
Cpu string `json:"cpu"`
|
||||
}
|
||||
|
||||
type collectStats struct {
|
||||
event event.EventApi
|
||||
factory docker.FactoryApi
|
||||
}
|
||||
|
||||
var CollectStatsEvent event.EventType
|
||||
|
||||
func init() {
|
||||
CollectStatsEvent = event.NewEventType("instance stats")
|
||||
}
|
||||
|
||||
func (t *collectStats) Name() string {
|
||||
return "CollectStats"
|
||||
}
|
||||
|
||||
func (t *collectStats) Run(ctx context.Context, instance *types.Instance) error {
|
||||
dockerClient, err := t.factory.GetForSession(instance.SessionId)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
reader, err := dockerClient.GetContainerStats(instance.Name)
|
||||
if err != nil {
|
||||
log.Println("Error while trying to collect instance stats", err)
|
||||
return err
|
||||
}
|
||||
dec := json.NewDecoder(reader)
|
||||
var v *dockerTypes.StatsJSON
|
||||
e := dec.Decode(&v)
|
||||
if e != nil {
|
||||
log.Println("Error while trying to collect instance stats", e)
|
||||
return err
|
||||
}
|
||||
stats := InstanceStats{Instance: instance.Name}
|
||||
// Memory
|
||||
var memPercent float64 = 0
|
||||
if v.MemoryStats.Limit != 0 {
|
||||
memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
|
||||
}
|
||||
mem := float64(v.MemoryStats.Usage)
|
||||
memLimit := float64(v.MemoryStats.Limit)
|
||||
|
||||
stats.Mem = fmt.Sprintf("%.2f%% (%s / %s)", memPercent, units.BytesSize(mem), units.BytesSize(memLimit))
|
||||
|
||||
// cpu
|
||||
previousCPU := v.PreCPUStats.CPUUsage.TotalUsage
|
||||
previousSystem := v.PreCPUStats.SystemUsage
|
||||
cpuPercent := calculateCPUPercentUnix(previousCPU, previousSystem, v)
|
||||
stats.Cpu = fmt.Sprintf("%.2f%%", cpuPercent)
|
||||
|
||||
t.event.Emit(CollectStatsEvent, instance.SessionId, stats)
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewCollectStats(e event.EventApi, f docker.FactoryApi) *collectStats {
|
||||
return &collectStats{event: e, factory: f}
|
||||
}
|
||||
|
||||
func calculateCPUPercentUnix(previousCPU, previousSystem uint64, v *dockerTypes.StatsJSON) float64 {
|
||||
var (
|
||||
cpuPercent = 0.0
|
||||
// calculate the change for the cpu usage of the container in between readings
|
||||
cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
|
||||
// calculate the change for the entire system between readings
|
||||
systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
|
||||
)
|
||||
|
||||
if systemDelta > 0.0 && cpuDelta > 0.0 {
|
||||
cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0
|
||||
}
|
||||
return cpuPercent
|
||||
}
|
||||
71
scheduler/task/collect_stats_test.go
Normal file
71
scheduler/task/collect_stats_test.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
dockerTypes "github.com/docker/docker/api/types"
|
||||
"github.com/play-with-docker/play-with-docker/docker"
|
||||
"github.com/play-with-docker/play-with-docker/event"
|
||||
"github.com/play-with-docker/play-with-docker/pwd/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
type mockSessionProvider struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockSessionProvider) GetDocker(sessionId string) (docker.DockerApi, error) {
|
||||
args := m.Called(sessionId)
|
||||
|
||||
return args.Get(0).(docker.DockerApi), args.Error(1)
|
||||
}
|
||||
|
||||
type nopCloser struct {
|
||||
io.Reader
|
||||
}
|
||||
|
||||
func (nopCloser) Close() error { return nil }
|
||||
|
||||
func TestCollectStats_Name(t *testing.T) {
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
task := NewCollectStats(e, f)
|
||||
|
||||
assert.Equal(t, "CollectStats", task.Name())
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestCollectStats_Run(t *testing.T) {
|
||||
d := &docker.Mock{}
|
||||
e := &event.Mock{}
|
||||
f := &docker.FactoryMock{}
|
||||
|
||||
stats := dockerTypes.StatsJSON{}
|
||||
b, _ := json.Marshal(stats)
|
||||
i := &types.Instance{
|
||||
IP: "10.0.0.1",
|
||||
Name: "aaaabbbb_node1",
|
||||
SessionId: "aaaabbbbcccc",
|
||||
}
|
||||
|
||||
f.On("GetForSession", i.SessionId).Return(d, nil)
|
||||
d.On("GetContainerStats", i.Name).Return(nopCloser{bytes.NewReader(b)}, nil)
|
||||
e.M.On("Emit", CollectStatsEvent, "aaaabbbbcccc", []interface{}{InstanceStats{Instance: i.Name, Mem: "0.00% (0B / 0B)", Cpu: "0.00%"}}).Return()
|
||||
|
||||
task := NewCollectStats(e, f)
|
||||
ctx := context.Background()
|
||||
|
||||
err := task.Run(ctx, i)
|
||||
|
||||
assert.Nil(t, err)
|
||||
d.AssertExpectations(t)
|
||||
e.M.AssertExpectations(t)
|
||||
f.AssertExpectations(t)
|
||||
}
|
||||
Reference in New Issue
Block a user