You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
minio/pkg/tasker/taskctl.go

158 lines
4.2 KiB

/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tasker
import (
"container/list"
"sync"
)
// TaskCtl (Task Controller) is a framework to create and manage
// tasks.
type TaskCtl struct {
mutex *sync.Mutex // Lock
// List of tasks managed by this task controller.
tasks *list.List
}
// New creates a new TaskCtl to create and control a collection of tasks. Single application can create multiple task controllers to manage different set of tasks separately.
func New(name string) *TaskCtl {
return &TaskCtl{
mutex: &sync.Mutex{},
tasks: list.New(),
}
}
// NewTask creates a new task structure and returns a handle to it. Only the task controller has access to the task structure. The caller routine only receives a handle to its task structure. Task handle is like a reference to task self. Caller is expected to listen for commands from the task controller and comply with it co-operatively.
func (tc *TaskCtl) NewTask(name string) Handle {
tc.mutex.Lock()
defer tc.mutex.Unlock()
// Create a new task.
tsk := newTask(name)
//Register this task in the TaskCtl's tasklist and save the reference.
tsk.this = tc.tasks.PushBack(tsk)
// Free task from the tasklist upon close call.
go func() {
// Release the tasks resources upon return of this function.
defer tsk.close()
// Will be notified here upon task's end of life.
this := <-tsk.closeCh
tc.mutex.Lock()
defer tc.mutex.Unlock()
// Release the task structure from the task list.
tc.tasks.Remove(this)
}()
// Return a handle to this task.
return tsk.getHandle()
}
// Shutdown ends all tasks, including the suspended ones.
func (tc *TaskCtl) Shutdown() {
tc.mutex.Lock()
defer tc.mutex.Unlock()
var wg sync.WaitGroup
// End all tasks.
for e := tc.tasks.Front(); e != nil; e = e.Next() {
wg.Add(1)
thisTask := e.Value.(task) // Make a local copy for go routine.
// End tasks in background. Flow of events from here is as follows: thisTask.handle.Close() -> tc.NewTask() -> this.task.close().
go func() { thisTask.getHandle().Close(); wg.Done() }()
}
wg.Wait() // Wait for all tasks to end gracefully.
// Reset the task pool.
tc.tasks = nil
}
// Suspend puts all tasks to sleep.
func (tc *TaskCtl) Suspend() bool {
tc.mutex.Lock()
defer tc.mutex.Unlock()
var wg sync.WaitGroup
// If any one of the task fails to suspend, this flag will set to false.
statusAll := make([]status, tc.tasks.Len())
// Suspend all tasks.
i := 0
for e := tc.tasks.Front(); e != nil; e = e.Next() {
wg.Add(1)
locTask := e.Value.(task) // Make a local copy for go routine.
locI := i // local i
// Suspend a task in background.
go func() {
defer wg.Done()
statusAll[locI] = locTask.command(CmdSignalSuspend)
}()
i++
}
wg.Wait() // Wait for all tasks to suspend gracefully.
for _, st := range statusAll {
if st.code != statusDone {
return false
}
}
return true
}
// Resume wakes up all suspended task from sleep.
func (tc *TaskCtl) Resume() bool {
tc.mutex.Lock()
defer tc.mutex.Unlock()
var wg sync.WaitGroup
// If any one of the task fails to suspend, this flag will set to false.
statusAll := make([]status, tc.tasks.Len())
i := 0
// Resume all suspended tasks.
for e := tc.tasks.Front(); e != nil; e = e.Next() {
wg.Add(1)
locTask := e.Value.(task) // Make a local copy for go routine.
locI := i // local i
// Resume a task in background.
go func() {
defer wg.Done()
statusAll[locI] = locTask.command(CmdSignalResume)
}()
i++
}
wg.Wait() // Wait for all tasks to resume.
for _, st := range statusAll {
if st.code != statusDone {
return false
}
}
return true
}