diff --git a/pkg/storage/pq/pq.go b/pkg/storage/pq/pq.go new file mode 100644 index 000000000..c491f4bb6 --- /dev/null +++ b/pkg/storage/pq/pq.go @@ -0,0 +1,70 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pq + +import "container/heap" + +// Item container for tasks in priority queue +type Item struct { + task Task // task + + // The index is needed by Fix and is maintained by the heap.Interface methods. + index int // The index of the item in the heap. +} + +// A PriorityQueue implements heap.Interface and holds Items. +type PriorityQueue []*Item + +// Len length of current priority queue +func (pq PriorityQueue) Len() int { return len(pq) } + +// Less used internally by heap.Interface to arrange items in order +func (pq PriorityQueue) Less(i, j int) bool { + // We want Pop to give us the highest, not lowest, priority so we use greater than here. + return pq[i].task.GetPriority() > pq[j].task.GetPriority() +} + +// Swap used internally by heap.Interface to arrange incoming items +func (pq PriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +// Push push items onto priority queue +func (pq *PriorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*Item) + item.index = n + *pq = append(*pq, item) +} + +// Pop pop items with highest priority +func (pq *PriorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// Fix modifies an item in-place on the queue +func (pq *PriorityQueue) Fix(item *Item, task Task) { + item.task = task + heap.Fix(pq, item.index) +} diff --git a/pkg/storage/pq/pq_test.go b/pkg/storage/pq/pq_test.go new file mode 100644 index 000000000..0723d273f --- /dev/null +++ b/pkg/storage/pq/pq_test.go @@ -0,0 +1,81 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pq + +import ( + "container/heap" + "fmt" + "testing" + + . "github.com/minio/check" +) + +func Test(t *testing.T) { TestingT(t) } + +type MySuite struct{} + +var _ = Suite(&MySuite{}) + +func helloTask1() error { + fmt.Println("Hello task1") + return nil +} + +func helloTask2() error { + fmt.Println("Hello task2") + return nil +} + +func newJob1() error { + fmt.Println("New Job1") + return nil +} + +func newJob2() error { + fmt.Println("New Job2") + return nil +} + +func (s *MySuite) TestPQ(c *C) { + // Create a priority queue, put the items in it, and + // establish the priority queue (heap) invariants. + pq := make(PriorityQueue, 2) + pq[0] = &Item{ + task: Task{job: helloTask1, priority: 2}, + index: 0, + } + pq[1] = &Item{ + task: Task{job: helloTask2, priority: 1}, + index: 1, + } + heap.Init(&pq) + + // Insert a new item and then modify its priority. + item := &Item{ + task: Task{job: newJob1, priority: 5}, + } + heap.Push(&pq, item) + newTask := Task{job: newJob2, priority: 6} + pq.Fix(item, newTask) + + // Take the items out; they arrive in decreasing priority order. + for pq.Len() > 0 { + item := heap.Pop(&pq).(*Item) + fmt.Printf("%.2d", item.task.GetPriority()) + item.task.Execute() + } +} diff --git a/pkg/storage/pq/task.go b/pkg/storage/pq/task.go new file mode 100644 index 000000000..1c092fbd5 --- /dev/null +++ b/pkg/storage/pq/task.go @@ -0,0 +1,38 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pq + +// Task container for any generic tasks +type Task struct { + job func() error + priority int +} + +// GetPriority get current task priority +func (t Task) GetPriority() int { + return t.priority +} + +// UpdatePriority update current task priority +func (t Task) UpdatePriority(p int) { + t.priority = p +} + +// Execute execute current task +func (t Task) Execute() error { + return t.job() +}