Fix issues with multipart upload

master
Harshavardhana 10 years ago
parent 474954022e
commit 8b94c53345
  1. 2
      pkg/donut/cache/data/data.go
  2. 7
      pkg/donut/donut-v2.go
  3. 15
      pkg/donut/donut-v2_test.go
  4. 21
      pkg/server/nimble/http.go

@ -85,8 +85,6 @@ func (r *Cache) SetMaxSize(maxSize uint64) {
// Stats get current cache statistics // Stats get current cache statistics
func (r *Cache) Stats() Stats { func (r *Cache) Stats() Stats {
r.Lock()
defer r.Unlock()
return Stats{ return Stats{
Bytes: r.currentSize, Bytes: r.currentSize,
Items: r.items.Len(), Items: r.items.Len(),

@ -113,7 +113,12 @@ func New() (Interface, error) {
return nil, iodine.New(err, nil) return nil, iodine.New(err, nil)
} }
for k, v := range buckets { for k, v := range buckets {
a.storedBuckets.Set(k, v) var newBucket = storedBucket{}
newBucket.objectMetadata = make(map[string]ObjectMetadata)
newBucket.multiPartSession = make(map[string]MultiPartSession)
newBucket.partMetadata = make(map[int]PartMetadata)
newBucket.bucketMetadata = v
a.storedBuckets.Set(k, newBucket)
} }
} }
return a, nil return a, nil

@ -22,6 +22,8 @@ import (
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"io/ioutil" "io/ioutil"
"os"
"path/filepath"
"testing" "testing"
. "github.com/minio/check" . "github.com/minio/check"
@ -29,13 +31,20 @@ import (
func TestCache(t *testing.T) { TestingT(t) } func TestCache(t *testing.T) { TestingT(t) }
type MyCacheSuite struct{} type MyCacheSuite struct {
root string
}
var _ = Suite(&MyCacheSuite{}) var _ = Suite(&MyCacheSuite{})
var dc Interface var dc Interface
func (s *MyCacheSuite) SetUpSuite(c *C) { func (s *MyCacheSuite) SetUpSuite(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
s.root = root
customConfigPath = filepath.Join(root, "donut.json")
var err error var err error
dc, err = New() dc, err = New()
c.Assert(err, IsNil) c.Assert(err, IsNil)
@ -46,6 +55,10 @@ func (s *MyCacheSuite) SetUpSuite(c *C) {
c.Assert(len(buckets), Equals, 0) c.Assert(len(buckets), Equals, 0)
} }
func (s *MyDonutSuite) TearDownSuite(c *C) {
os.RemoveAll(s.root)
}
// test make bucket without name // test make bucket without name
func (s *MyCacheSuite) TestBucketWithoutNameFails(c *C) { func (s *MyCacheSuite) TestBucketWithoutNameFails(c *C) {
// fail to create new bucket without a name // fail to create new bucket without a name

@ -25,7 +25,7 @@ type app struct {
errors chan error errors chan error
} }
func newApp(servers []*http.Server) *app { func newApp(servers ...*http.Server) *app {
return &app{ return &app{
servers: servers, servers: servers,
net: &nimbleNet{}, net: &nimbleNet{},
@ -64,7 +64,7 @@ func (a *app) wait() {
go func(s httpdown.Server) { go func(s httpdown.Server) {
defer wg.Done() defer wg.Done()
if err := s.Wait(); err != nil { if err := s.Wait(); err != nil {
a.errors <- err a.errors <- iodine.New(err, nil)
} }
}(s) }(s)
} }
@ -76,7 +76,7 @@ func (a *app) term(wg *sync.WaitGroup) {
go func(s httpdown.Server) { go func(s httpdown.Server) {
defer wg.Done() defer wg.Done()
if err := s.Stop(); err != nil { if err := s.Stop(); err != nil {
a.errors <- err a.errors <- iodine.New(err, nil)
} }
}(s) }(s)
} }
@ -84,7 +84,7 @@ func (a *app) term(wg *sync.WaitGroup) {
func (a *app) signalHandler(wg *sync.WaitGroup) { func (a *app) signalHandler(wg *sync.WaitGroup) {
ch := make(chan os.Signal, 10) ch := make(chan os.Signal, 10)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGUSR2) signal.Notify(ch, syscall.SIGTERM, os.Interrupt)
for { for {
sig := <-ch sig := <-ch
switch sig { switch sig {
@ -94,7 +94,7 @@ func (a *app) signalHandler(wg *sync.WaitGroup) {
signal.Stop(ch) signal.Stop(ch)
a.term(wg) a.term(wg)
return return
case syscall.SIGUSR2: case os.Interrupt:
// we only return here if there's an error, otherwise the new process // we only return here if there's an error, otherwise the new process
// will send us a TERM when it's ready to trigger the actual shutdown. // will send us a TERM when it's ready to trigger the actual shutdown.
if _, err := a.net.StartProcess(); err != nil { if _, err := a.net.StartProcess(); err != nil {
@ -107,9 +107,7 @@ func (a *app) signalHandler(wg *sync.WaitGroup) {
// ListenAndServe will serve the given http.Servers and will monitor for signals // ListenAndServe will serve the given http.Servers and will monitor for signals
// allowing for graceful termination (SIGTERM) or restart (SIGHUP). // allowing for graceful termination (SIGTERM) or restart (SIGHUP).
func ListenAndServe(servers ...*http.Server) error { func ListenAndServe(servers ...*http.Server) error {
ppid := os.Getppid() a := newApp(servers...)
a := newApp(servers)
// Acquire Listeners // Acquire Listeners
if err := a.listen(); err != nil { if err := a.listen(); err != nil {
@ -119,13 +117,6 @@ func ListenAndServe(servers ...*http.Server) error {
// Start serving. // Start serving.
a.serve() a.serve()
// Close the parent if we inherited and it wasn't init that started us.
if os.Getenv("LISTEN_FDS") != "" && ppid != 1 {
if err := syscall.Kill(ppid, syscall.SIGTERM); err != nil {
return iodine.New(err, nil)
}
}
waitDone := make(chan struct{}) waitDone := make(chan struct{})
go func() { go func() {
defer close(waitDone) defer close(waitDone)

Loading…
Cancel
Save