From b52697e6ad1c19b18a5e3173f9476fcf6b0fefa6 Mon Sep 17 00:00:00 2001 From: "Anand Babu (AB) Periasamy" Date: Thu, 8 Oct 2015 02:20:24 -0700 Subject: [PATCH] new task model minio server --- pkg/tasker/commands.go | 42 ++++++++++ pkg/tasker/handle.go | 56 +++++++++++++ pkg/tasker/status.go | 35 +++++++++ pkg/tasker/task.go | 106 +++++++++++++++++++++++++ pkg/tasker/taskctl.go | 157 +++++++++++++++++++++++++++++++++++++ pkg/tasker/taskctl_test.go | 38 +++++++++ 6 files changed, 434 insertions(+) create mode 100644 pkg/tasker/commands.go create mode 100644 pkg/tasker/handle.go create mode 100644 pkg/tasker/status.go create mode 100644 pkg/tasker/task.go create mode 100644 pkg/tasker/taskctl.go create mode 100644 pkg/tasker/taskctl_test.go diff --git a/pkg/tasker/commands.go b/pkg/tasker/commands.go new file mode 100644 index 000000000..6d752ba2a --- /dev/null +++ b/pkg/tasker/commands.go @@ -0,0 +1,42 @@ +/* + * Minio Cloud Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tasker + +// Command is number that uniquely identifies a command function. +type Command uint8 + +// Enumerate the task commands. +const ( + // CmdNOOP does nothing. It is a default placeholder. Uninitialized variable of this type will point to NOOP command by default. + CmdNOOP Command = iota + // CmdSignalEnd gracefully ends current task. Never ending tasks (loop over) or Batched jobs will not take the next iteration, but may finish the current state to completion. + CmdSignalEnd + // CmdSignalAbort ends the current task at hand immediately. It may still cleanup dangling issues quickly. + CmdSignalAbort + // CmdSignalSuspend suspends the current task. + CmdSignalSuspend + // CmdSignalResume resumes a suspended task. + CmdSignalResume + // CmdPriorityLow is optimized to conserve resources and complete the task at a slow pace. This option is ideal for batch processed tasks. + CmdPriorityLow + // CmdPriorityMedium is the default priority. It is a balanced option between resources and speed. + CmdPriorityMedium + // CmdPriorityHigh is optimized for speed. This option is ideal for short lived tasks (like meta-data related) that are latency sensitive. Use this option wisely. + CmdPriorityHigh + // CmdPrioritySuper is an exclusive priority. All tasks with priority lower than Super (including High) are paused temporarily until this task completes. Anytime you consider using this priority level, please seek for approval. + CmdPrioritySuper +) diff --git a/pkg/tasker/handle.go b/pkg/tasker/handle.go new file mode 100644 index 000000000..c3330dede --- /dev/null +++ b/pkg/tasker/handle.go @@ -0,0 +1,56 @@ +/* + * Minio Cloud Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tasker + +import "github.com/minio/minio/pkg/probe" + +// Handle as the name suggests is a handle (self reference) to its +// own task structure. Task has limited privileges over itself. Only the +// task controller (TaskCtl) can manage the task by sending commands to +// the task over channels. +type Handle struct { + this taskRef + cmdCh <-chan Command // Channel to receive commands from TaskCtl. + statusCh chan<- status // Channel to send completion status and error (if any) to TaskCtl. + closeCh chan<- taskRef // Channel to notify the TaskCtl about ending this task. +} + +// Listen returns a channel to receive commands. +func (t Handle) Listen() <-chan Command { + return t.cmdCh +} + +// StatusDone acknowledges successful completion of a command. +func (t Handle) StatusDone() { + t.statusCh <- status{code: statusDone, err: nil} +} + +// StatusBusy rejects a command with busy status. +func (t Handle) StatusBusy() { + t.statusCh <- status{code: statusBusy, err: nil} +} + +// StatusFail returns failure status. +func (t Handle) StatusFail(err *probe.Error) { + t.statusCh <- status{code: statusFail, err: err} +} + +// Close notifies the TaskCtl about the end of this Task. Owner of the +// task must invoke Close() when it is done performing its job. +func (t Handle) Close() { + t.closeCh <- t.this +} diff --git a/pkg/tasker/status.go b/pkg/tasker/status.go new file mode 100644 index 000000000..f1ed47eb3 --- /dev/null +++ b/pkg/tasker/status.go @@ -0,0 +1,35 @@ +/* + * Minio Cloud Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tasker + +import "github.com/minio/minio/pkg/probe" + +// StatusCode denotes the completion status of a command. +type statusCode int8 + +// Enumerate task return status codes. +const ( + statusDone statusCode = iota + statusBusy + statusFail +) + +// Status returns the completion status and error (if any) of a command. +type status struct { + code statusCode // Completion code. + err *probe.Error // Error if any. +} diff --git a/pkg/tasker/task.go b/pkg/tasker/task.go new file mode 100644 index 000000000..bddf06dba --- /dev/null +++ b/pkg/tasker/task.go @@ -0,0 +1,106 @@ +/* + * Minio Cloud Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tasker + +import ( + "container/list" + "sync" +) + +/* NOTE: +Task is a private entity. It is created and managed by TaskCtl +entirely. Only TaskCtl and Handle objects are exposed outside. +*/ + +/* taskRef is a unique reference ID to a task. It is assigned by the +TaskCtl during the creation of a task. All tasfRef variables are +named "this". */ +type taskRef *list.Element + +/* Task is an abstract concept built on top of Go routines and +channels. Tasks themselves are expected to co-operate and comply with +the TaskCtl commands. +*/ +type task struct { + mutex *sync.Mutex + + this taskRef // Refence to task entry in the TaskCtl's task list. + name string // Free form name. + priority Command // Current priority. + cmdCh chan Command // Channel to receive commands from TaskCtl. + statusCh chan status // Channel to send completion status and error (if any) to TaskCtl. + closeCh chan taskRef // Channel to notify the TaskCtl about ending this task. +} + +/* NewTask creates a new task structure and returns a handle to +it. Only the task controller has access to the task structure. The +caller routine only receives a handle to its task structure. Task +handle is like a reference to task self. Caller is expected to listen +for commands from the task controller and comply with it +co-operatively. +this: Task reference is unique identifier assigned by the TaskCtl. +name: Free form name of the task. Eg. "Late Night Disk Scrubber". */ +func newTask(name string) task { + return task{ + // this: Is set by the TaskCtl's NewTask function. + mutex: &sync.Mutex{}, + name: name, + priority: CmdPriorityMedium, + cmdCh: make(chan Command), + statusCh: make(chan status), + closeCh: make(chan taskRef), + } +} + +// getHandle returns a handle to the task. Handle has limited access to the task structure and it is safe to be exposed. +func (t task) getHandle() Handle { + t.mutex.Lock() + defer t.mutex.Unlock() + + // Make a handle with limited access to channels (only send or receive). + return Handle{ + cmdCh: t.cmdCh, + statusCh: t.statusCh, + closeCh: t.closeCh, + } +} + +// command method sends a command code to the task and returns its completion status. +func (t task) command(cmd Command) status { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.cmdCh <- cmd + return <-t.statusCh +} + +// close releases all the resources held by this task. +func (t task) close() { + t.mutex.Lock() + defer t.mutex.Unlock() + + // Task can be ended in 2 ways. + // 1) Calling application invokes Handle.Close(). + // 2) TaskCtl.Shutdown() ending the task's life. + // In either case, task.close() is invoked only via the + // TaskCtl. Handle.Close() only sends a message to the TaskCtl to + // initiate a close call. + + close(t.cmdCh) + close(t.statusCh) + close(t.closeCh) +} diff --git a/pkg/tasker/taskctl.go b/pkg/tasker/taskctl.go new file mode 100644 index 000000000..8ac0cb252 --- /dev/null +++ b/pkg/tasker/taskctl.go @@ -0,0 +1,157 @@ +/* + * Minio Cloud Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tasker + +import ( + "container/list" + "sync" +) + +// TaskCtl (Task Controller) is a framework to create and manage +// tasks. +type TaskCtl struct { + mutex *sync.Mutex // Lock + // List of tasks managed by this task controller. + tasks *list.List +} + +// New creates a new TaskCtl to create and control a collection of tasks. Single application can create multiple task controllers to manage different set of tasks separately. +func New(name string) *TaskCtl { + return &TaskCtl{ + mutex: &sync.Mutex{}, + tasks: list.New(), + } +} + +// NewTask creates a new task structure and returns a handle to it. Only the task controller has access to the task structure. The caller routine only receives a handle to its task structure. Task handle is like a reference to task self. Caller is expected to listen for commands from the task controller and comply with it co-operatively. +func (tc *TaskCtl) NewTask(name string) Handle { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + // Create a new task. + tsk := newTask(name) + + //Register this task in the TaskCtl's tasklist and save the reference. + tsk.this = tc.tasks.PushBack(tsk) + + // Free task from the tasklist upon close call. + go func() { + // Release the tasks resources upon return of this function. + defer tsk.close() + + // Will be notified here upon task's end of life. + this := <-tsk.closeCh + + tc.mutex.Lock() + defer tc.mutex.Unlock() + + // Release the task structure from the task list. + tc.tasks.Remove(this) + }() + + // Return a handle to this task. + return tsk.getHandle() +} + +// Shutdown ends all tasks, including the suspended ones. +func (tc *TaskCtl) Shutdown() { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + var wg sync.WaitGroup + + // End all tasks. + for e := tc.tasks.Front(); e != nil; e = e.Next() { + wg.Add(1) + thisTask := e.Value.(task) // Make a local copy for go routine. + // End tasks in background. Flow of events from here is as follows: thisTask.handle.Close() -> tc.NewTask() -> this.task.close(). + go func() { thisTask.getHandle().Close(); wg.Done() }() + } + + wg.Wait() // Wait for all tasks to end gracefully. + + // Reset the task pool. + tc.tasks = nil +} + +// Suspend puts all tasks to sleep. +func (tc *TaskCtl) Suspend() bool { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + var wg sync.WaitGroup + + // If any one of the task fails to suspend, this flag will set to false. + statusAll := make([]status, tc.tasks.Len()) + + // Suspend all tasks. + i := 0 + for e := tc.tasks.Front(); e != nil; e = e.Next() { + wg.Add(1) + locTask := e.Value.(task) // Make a local copy for go routine. + locI := i // local i + // Suspend a task in background. + go func() { + defer wg.Done() + statusAll[locI] = locTask.command(CmdSignalSuspend) + }() + i++ + } + + wg.Wait() // Wait for all tasks to suspend gracefully. + + for _, st := range statusAll { + if st.code != statusDone { + return false + } + } + return true +} + +// Resume wakes up all suspended task from sleep. +func (tc *TaskCtl) Resume() bool { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + var wg sync.WaitGroup + + // If any one of the task fails to suspend, this flag will set to false. + statusAll := make([]status, tc.tasks.Len()) + + i := 0 + // Resume all suspended tasks. + for e := tc.tasks.Front(); e != nil; e = e.Next() { + wg.Add(1) + locTask := e.Value.(task) // Make a local copy for go routine. + locI := i // local i + // Resume a task in background. + go func() { + defer wg.Done() + statusAll[locI] = locTask.command(CmdSignalResume) + }() + i++ + } + wg.Wait() // Wait for all tasks to resume. + + for _, st := range statusAll { + if st.code != statusDone { + return false + } + } + return true + +} diff --git a/pkg/tasker/taskctl_test.go b/pkg/tasker/taskctl_test.go new file mode 100644 index 000000000..4dae9c0ea --- /dev/null +++ b/pkg/tasker/taskctl_test.go @@ -0,0 +1,38 @@ +/* + * Quick - Quick key value store for config files and persistent state files + * + * Minio Client (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package tasker_test + +import ( + "testing" + + "github.com/minio/minio/pkg/tasker" + + . "gopkg.in/check.v1" +) + +func Test(t *testing.T) { TestingT(t) } + +type MySuite struct{} + +var _ = Suite(&MySuite{}) + +func (s *MySuite) TestCheckData(c *C) { + testTasks := tasker.New("Test Task") + testTasks.Shutdown() + // c.Assert(err, Not(IsNil)) +}