Remove priority queue, implement it using a simpler channelsmaster
parent
ebe61d99d9
commit
5cfb05465e
@ -0,0 +1,252 @@ |
||||
/* |
||||
* Minimalist Object Storage, (C) 2015 Minio, Inc. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impliedc. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package donut |
||||
|
||||
import ( |
||||
"bytes" |
||||
"crypto/md5" |
||||
"encoding/base64" |
||||
"encoding/hex" |
||||
"io/ioutil" |
||||
"testing" |
||||
"time" |
||||
|
||||
. "github.com/minio/check" |
||||
) |
||||
|
||||
func TestCache(t *testing.T) { TestingT(t) } |
||||
|
||||
type MyCacheSuite struct{} |
||||
|
||||
var _ = Suite(&MyCacheSuite{}) |
||||
|
||||
var dc Cache |
||||
|
||||
func (s *MyCacheSuite) SetUpSuite(c *C) { |
||||
// no donut this time
|
||||
dc = NewCache(100000, time.Duration(1*time.Hour), "", nil) |
||||
// testing empty cache
|
||||
buckets, err := dc.ListBuckets() |
||||
c.Assert(err, IsNil) |
||||
c.Assert(len(buckets), Equals, 0) |
||||
} |
||||
|
||||
// test make bucket without name
|
||||
func (s *MyCacheSuite) TestBucketWithoutNameFails(c *C) { |
||||
// fail to create new bucket without a name
|
||||
err := dc.MakeBucket("", "private") |
||||
c.Assert(err, Not(IsNil)) |
||||
|
||||
err = dc.MakeBucket(" ", "private") |
||||
c.Assert(err, Not(IsNil)) |
||||
} |
||||
|
||||
// test empty bucket
|
||||
func (s *MyCacheSuite) TestEmptyBucket(c *C) { |
||||
c.Assert(dc.MakeBucket("foo1", "private"), IsNil) |
||||
// check if bucket is empty
|
||||
var resources BucketResourcesMetadata |
||||
resources.Maxkeys = 1 |
||||
objectsMetadata, resources, err := dc.ListObjects("foo1", resources) |
||||
c.Assert(err, IsNil) |
||||
c.Assert(len(objectsMetadata), Equals, 0) |
||||
c.Assert(resources.CommonPrefixes, DeepEquals, []string{}) |
||||
c.Assert(resources.IsTruncated, Equals, false) |
||||
} |
||||
|
||||
// test bucket list
|
||||
func (s *MyCacheSuite) TestMakeBucketAndList(c *C) { |
||||
// create bucket
|
||||
err := dc.MakeBucket("foo2", "private") |
||||
c.Assert(err, IsNil) |
||||
|
||||
// check bucket exists
|
||||
buckets, err := dc.ListBuckets() |
||||
c.Assert(err, IsNil) |
||||
c.Assert(len(buckets), Equals, 5) |
||||
c.Assert(buckets[0].ACL, Equals, BucketACL("private")) |
||||
} |
||||
|
||||
// test re-create bucket
|
||||
func (s *MyCacheSuite) TestMakeBucketWithSameNameFails(c *C) { |
||||
err := dc.MakeBucket("foo3", "private") |
||||
c.Assert(err, IsNil) |
||||
|
||||
err = dc.MakeBucket("foo3", "private") |
||||
c.Assert(err, Not(IsNil)) |
||||
} |
||||
|
||||
// test make multiple buckets
|
||||
func (s *MyCacheSuite) TestCreateMultipleBucketsAndList(c *C) { |
||||
// add a second bucket
|
||||
err := dc.MakeBucket("foo4", "private") |
||||
c.Assert(err, IsNil) |
||||
|
||||
err = dc.MakeBucket("bar1", "private") |
||||
c.Assert(err, IsNil) |
||||
|
||||
buckets, err := dc.ListBuckets() |
||||
c.Assert(err, IsNil) |
||||
|
||||
c.Assert(len(buckets), Equals, 2) |
||||
c.Assert(buckets[0].Name, Equals, "bar1") |
||||
c.Assert(buckets[1].Name, Equals, "foo4") |
||||
|
||||
err = dc.MakeBucket("foobar1", "private") |
||||
c.Assert(err, IsNil) |
||||
|
||||
buckets, err = dc.ListBuckets() |
||||
c.Assert(err, IsNil) |
||||
|
||||
c.Assert(len(buckets), Equals, 3) |
||||
c.Assert(buckets[2].Name, Equals, "foobar1") |
||||
} |
||||
|
||||
// test object create without bucket
|
||||
func (s *MyCacheSuite) TestNewObjectFailsWithoutBucket(c *C) { |
||||
_, err := dc.CreateObject("unknown", "obj", "", "", 0, nil) |
||||
c.Assert(err, Not(IsNil)) |
||||
} |
||||
|
||||
// test create object metadata
|
||||
func (s *MyCacheSuite) TestNewObjectMetadata(c *C) { |
||||
data := "Hello World" |
||||
hasher := md5.New() |
||||
hasher.Write([]byte(data)) |
||||
expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) |
||||
reader := ioutil.NopCloser(bytes.NewReader([]byte(data))) |
||||
|
||||
err := dc.MakeBucket("foo6", "private") |
||||
c.Assert(err, IsNil) |
||||
|
||||
objectMetadata, err := dc.CreateObject("foo6", "obj", "application/json", expectedMd5Sum, int64(len(data)), reader) |
||||
c.Assert(err, IsNil) |
||||
c.Assert(objectMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil))) |
||||
c.Assert(objectMetadata.Metadata["contentType"], Equals, "application/json") |
||||
} |
||||
|
||||
// test create object fails without name
|
||||
func (s *MyCacheSuite) TestNewObjectFailsWithEmptyName(c *C) { |
||||
_, err := dc.CreateObject("foo", "", "", "", 0, nil) |
||||
c.Assert(err, Not(IsNil)) |
||||
} |
||||
|
||||
// test create object
|
||||
func (s *MyCacheSuite) TestNewObjectCanBeWritten(c *C) { |
||||
err := dc.MakeBucket("foo", "private") |
||||
c.Assert(err, IsNil) |
||||
|
||||
data := "Hello World" |
||||
|
||||
hasher := md5.New() |
||||
hasher.Write([]byte(data)) |
||||
expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) |
||||
reader := ioutil.NopCloser(bytes.NewReader([]byte(data))) |
||||
|
||||
actualMetadata, err := dc.CreateObject("foo", "obj", "application/octet-stream", expectedMd5Sum, int64(len(data)), reader) |
||||
c.Assert(err, IsNil) |
||||
c.Assert(actualMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil))) |
||||
|
||||
var buffer bytes.Buffer |
||||
size, err := dc.GetObject(&buffer, "foo", "obj") |
||||
c.Assert(err, IsNil) |
||||
c.Assert(size, Equals, int64(len(data))) |
||||
c.Assert(buffer.Bytes(), DeepEquals, []byte(data)) |
||||
|
||||
actualMetadata, err = dc.GetObjectMetadata("foo", "obj") |
||||
c.Assert(err, IsNil) |
||||
c.Assert(hex.EncodeToString(hasher.Sum(nil)), Equals, actualMetadata.MD5Sum) |
||||
c.Assert(int64(len(data)), Equals, actualMetadata.Size) |
||||
} |
||||
|
||||
// test list objects
|
||||
func (s *MyCacheSuite) TestMultipleNewObjects(c *C) { |
||||
c.Assert(dc.MakeBucket("foo5", "private"), IsNil) |
||||
|
||||
one := ioutil.NopCloser(bytes.NewReader([]byte("one"))) |
||||
|
||||
_, err := dc.CreateObject("foo5", "obj1", "", "", int64(len("one")), one) |
||||
c.Assert(err, IsNil) |
||||
|
||||
two := ioutil.NopCloser(bytes.NewReader([]byte("two"))) |
||||
_, err = dc.CreateObject("foo5", "obj2", "", "", int64(len("two")), two) |
||||
c.Assert(err, IsNil) |
||||
|
||||
var buffer1 bytes.Buffer |
||||
size, err := dc.GetObject(&buffer1, "foo5", "obj1") |
||||
c.Assert(err, IsNil) |
||||
c.Assert(size, Equals, int64(len([]byte("one")))) |
||||
c.Assert(buffer1.Bytes(), DeepEquals, []byte("one")) |
||||
|
||||
var buffer2 bytes.Buffer |
||||
size, err = dc.GetObject(&buffer2, "foo5", "obj2") |
||||
c.Assert(err, IsNil) |
||||
c.Assert(size, Equals, int64(len([]byte("two")))) |
||||
|
||||
c.Assert(buffer2.Bytes(), DeepEquals, []byte("two")) |
||||
|
||||
/// test list of objects
|
||||
|
||||
// test list objects with prefix and delimiter
|
||||
var resources BucketResourcesMetadata |
||||
resources.Prefix = "o" |
||||
resources.Delimiter = "1" |
||||
resources.Maxkeys = 10 |
||||
objectsMetadata, resources, err := dc.ListObjects("foo5", resources) |
||||
c.Assert(err, IsNil) |
||||
c.Assert(resources.IsTruncated, Equals, false) |
||||
c.Assert(resources.CommonPrefixes[0], Equals, "obj1") |
||||
|
||||
// test list objects with only delimiter
|
||||
resources.Prefix = "" |
||||
resources.Delimiter = "1" |
||||
resources.Maxkeys = 10 |
||||
objectsMetadata, resources, err = dc.ListObjects("foo5", resources) |
||||
c.Assert(err, IsNil) |
||||
c.Assert(objectsMetadata[0].Object, Equals, "obj2") |
||||
c.Assert(resources.IsTruncated, Equals, false) |
||||
c.Assert(resources.CommonPrefixes[0], Equals, "obj1") |
||||
|
||||
// test list objects with only prefix
|
||||
resources.Prefix = "o" |
||||
resources.Delimiter = "" |
||||
resources.Maxkeys = 10 |
||||
objectsMetadata, resources, err = dc.ListObjects("foo5", resources) |
||||
c.Assert(err, IsNil) |
||||
c.Assert(resources.IsTruncated, Equals, false) |
||||
c.Assert(objectsMetadata[0].Object, Equals, "obj1") |
||||
c.Assert(objectsMetadata[1].Object, Equals, "obj2") |
||||
|
||||
three := ioutil.NopCloser(bytes.NewReader([]byte("three"))) |
||||
_, err = dc.CreateObject("foo5", "obj3", "", "", int64(len("three")), three) |
||||
c.Assert(err, IsNil) |
||||
|
||||
var buffer bytes.Buffer |
||||
size, err = dc.GetObject(&buffer, "foo5", "obj3") |
||||
c.Assert(err, IsNil) |
||||
c.Assert(size, Equals, int64(len([]byte("three")))) |
||||
c.Assert(buffer.Bytes(), DeepEquals, []byte("three")) |
||||
|
||||
// test list objects with maxkeys
|
||||
resources.Prefix = "o" |
||||
resources.Delimiter = "" |
||||
resources.Maxkeys = 2 |
||||
objectsMetadata, resources, err = dc.ListObjects("foo5", resources) |
||||
c.Assert(err, IsNil) |
||||
c.Assert(resources.IsTruncated, Equals, true) |
||||
c.Assert(len(objectsMetadata), Equals, 2) |
||||
} |
@ -1,70 +0,0 @@ |
||||
/* |
||||
* Minimalist Object Storage, (C) 2015 Minio, Inc. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package pq |
||||
|
||||
import "container/heap" |
||||
|
||||
// Item container for tasks in priority queue
|
||||
type Item struct { |
||||
task Task // task
|
||||
|
||||
// The index is needed by Fix and is maintained by the heap.Interface methods.
|
||||
index int // The index of the item in the heap.
|
||||
} |
||||
|
||||
// A PriorityQueue implements heap.Interface and holds Items.
|
||||
type PriorityQueue []*Item |
||||
|
||||
// Len length of current priority queue
|
||||
func (pq PriorityQueue) Len() int { return len(pq) } |
||||
|
||||
// Less used internally by heap.Interface to arrange items in order
|
||||
func (pq PriorityQueue) Less(i, j int) bool { |
||||
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
|
||||
return pq[i].task.GetPriority() > pq[j].task.GetPriority() |
||||
} |
||||
|
||||
// Swap used internally by heap.Interface to arrange incoming items
|
||||
func (pq PriorityQueue) Swap(i, j int) { |
||||
pq[i], pq[j] = pq[j], pq[i] |
||||
pq[i].index = i |
||||
pq[j].index = j |
||||
} |
||||
|
||||
// Push push items onto priority queue
|
||||
func (pq *PriorityQueue) Push(x interface{}) { |
||||
n := len(*pq) |
||||
item := x.(*Item) |
||||
item.index = n |
||||
*pq = append(*pq, item) |
||||
} |
||||
|
||||
// Pop pop items with highest priority
|
||||
func (pq *PriorityQueue) Pop() interface{} { |
||||
old := *pq |
||||
n := len(old) |
||||
item := old[n-1] |
||||
item.index = -1 // for safety
|
||||
*pq = old[0 : n-1] |
||||
return item |
||||
} |
||||
|
||||
// Fix modifies an item in-place on the queue
|
||||
func (pq *PriorityQueue) Fix(item *Item, task Task) { |
||||
item.task = task |
||||
heap.Fix(pq, item.index) |
||||
} |
@ -1,101 +0,0 @@ |
||||
/* |
||||
* Minimalist Object Storage, (C) 2015 Minio, Inc. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package pq |
||||
|
||||
import ( |
||||
"container/heap" |
||||
"fmt" |
||||
"testing" |
||||
|
||||
. "github.com/minio/check" |
||||
) |
||||
|
||||
func Test(t *testing.T) { TestingT(t) } |
||||
|
||||
type MySuite struct{} |
||||
|
||||
var _ = Suite(&MySuite{}) |
||||
|
||||
func helloTask1() <-chan error { |
||||
errCh := make(chan error) |
||||
go func() { |
||||
defer close(errCh) |
||||
println("Hello task1") |
||||
errCh <- nil |
||||
}() |
||||
return errCh |
||||
} |
||||
|
||||
func helloTask2() <-chan error { |
||||
errCh := make(chan error) |
||||
go func() { |
||||
defer close(errCh) |
||||
println("Hello task2") |
||||
errCh <- nil |
||||
}() |
||||
return errCh |
||||
} |
||||
|
||||
func newJob1() <-chan error { |
||||
errCh := make(chan error) |
||||
go func() { |
||||
defer close(errCh) |
||||
println("New Job1") |
||||
errCh <- nil |
||||
}() |
||||
return errCh |
||||
} |
||||
|
||||
func newJob2() <-chan error { |
||||
errCh := make(chan error) |
||||
go func() { |
||||
defer close(errCh) |
||||
println("New Job2") |
||||
errCh <- nil |
||||
}() |
||||
return errCh |
||||
} |
||||
|
||||
func (s *MySuite) TestPQ(c *C) { |
||||
// Create a priority queue, put the items in it, and
|
||||
// establish the priority queue (heap) invariants.
|
||||
pq := make(PriorityQueue, 2) |
||||
pq[0] = &Item{ |
||||
task: Task{job: helloTask1, priority: 2}, |
||||
index: 0, |
||||
} |
||||
pq[1] = &Item{ |
||||
task: Task{job: helloTask2, priority: 1}, |
||||
index: 1, |
||||
} |
||||
heap.Init(&pq) |
||||
|
||||
// Insert a new item and then modify its priority.
|
||||
item := &Item{ |
||||
task: Task{job: newJob1, priority: 5}, |
||||
} |
||||
heap.Push(&pq, item) |
||||
newTask := Task{job: newJob2, priority: 6} |
||||
pq.Fix(item, newTask) |
||||
|
||||
// Take the items out; they arrive in decreasing priority order.
|
||||
for pq.Len() > 0 { |
||||
item := heap.Pop(&pq).(*Item) |
||||
fmt.Printf("%.2d", item.task.GetPriority()) |
||||
item.task.Execute() |
||||
} |
||||
} |
@ -1,38 +0,0 @@ |
||||
/* |
||||
* Minimalist Object Storage, (C) 2015 Minio, Inc. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package pq |
||||
|
||||
// Task container for any generic tasks
|
||||
type Task struct { |
||||
job func() <-chan error |
||||
priority int |
||||
} |
||||
|
||||
// GetPriority get current task priority
|
||||
func (t Task) GetPriority() int { |
||||
return t.priority |
||||
} |
||||
|
||||
// UpdatePriority update current task priority
|
||||
func (t Task) UpdatePriority(p int) { |
||||
t.priority = p |
||||
} |
||||
|
||||
// Execute execute current task
|
||||
func (t Task) Execute() error { |
||||
return <-t.job() |
||||
} |
Loading…
Reference in new issue