From a3e5121f7bb41a036d86d0c82af30e3192687851 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 17 Feb 2015 23:48:42 -0800 Subject: [PATCH] Make channels more aware of errors and shutdown on error - some cleanup --- pkg/server/server.go | 29 ++++++++++++++--------------- pkg/storage/fs/fs.go | 38 ++++++++++++++++++-------------------- 2 files changed, 32 insertions(+), 35 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 1b44e0203..3aad8bf90 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -18,7 +18,6 @@ package server import ( "log" - "os" "path" "reflect" @@ -129,14 +128,8 @@ func getStorageChannels(storageType StorageType) (ctrlChans []chan<- string, sta case storageType == FileStorage: { homeDir := helpers.HomeDir() - rootPath := path.Join(homeDir, "minio-storage") - _, err := os.Stat(rootPath) - if os.IsNotExist(err) { - err = os.Mkdir(rootPath, 0700) - } else if err != nil { - log.Fatal("Could not create $HOME/minio-storage", err) - } - ctrlChan, statusChan, storage = fs.Start(rootPath) + root := path.Join(homeDir, "minio-storage") + ctrlChan, statusChan, storage = fs.Start(root) ctrlChans = append(ctrlChans, ctrlChan) statusChans = append(statusChans, statusChan) } @@ -147,17 +140,23 @@ func getStorageChannels(storageType StorageType) (ctrlChans []chan<- string, sta } func Start(configs []ServerConfig) { - // listen for critical errors - // TODO Handle critical errors appropriately when they arise // reflected looping is necessary to remove dead channels from loop and not flood switch - _, statusChans := getHttpChannels(configs) + ctrlChans, statusChans := getHttpChannels(configs) cases := createSelectCases(statusChans) for len(cases) > 0 { chosen, value, recvOk := reflect.Select(cases) - if recvOk == true { + switch recvOk { + case true: // Status Message Received - log.Println(chosen, value.Interface(), recvOk) - } else { + switch true { + case value.Interface() != nil: + // For any error received cleanup all existing channels and fail + for _, ch := range ctrlChans { + close(ch) + } + log.Fatal(value.Interface()) + } + case false: // Channel closed, remove from list var aliveStatusChans []<-chan error for i, ch := range statusChans { diff --git a/pkg/storage/fs/fs.go b/pkg/storage/fs/fs.go index bd8df7116..3d0c08dba 100644 --- a/pkg/storage/fs/fs.go +++ b/pkg/storage/fs/fs.go @@ -31,33 +31,31 @@ import ( ) type storage struct { - root string - writeLock sync.Mutex + root string + lock *sync.Mutex } type SerializedMetadata struct { ContentType string } -type MkdirFailedError struct{} - -func (self MkdirFailedError) Error() string { - return "Mkdir Failed" -} - func Start(root string) (chan<- string, <-chan error, *storage) { ctrlChannel := make(chan string) errorChannel := make(chan error) - go start(ctrlChannel, errorChannel) - return ctrlChannel, errorChannel, &storage{root: root} + s := storage{} + s.root = root + s.lock = new(sync.Mutex) + go start(ctrlChannel, errorChannel, &s) + return ctrlChannel, errorChannel, &s } -func start(ctrlChannel <-chan string, errorChannel chan<- error) { +func start(ctrlChannel <-chan string, errorChannel chan<- error, s *storage) { + err := os.MkdirAll(s.root, 0700) + errorChannel <- err close(errorChannel) } // Bucket Operations - func (storage *storage) ListBuckets() ([]mstorage.BucketMetadata, error) { files, err := ioutil.ReadDir(storage.root) if err != nil { @@ -83,8 +81,8 @@ func (storage *storage) ListBuckets() ([]mstorage.BucketMetadata, error) { } func (storage *storage) StoreBucket(bucket string) error { - storage.writeLock.Lock() - defer storage.writeLock.Unlock() + storage.lock.Lock() + defer storage.lock.Unlock() // verify bucket path legal if mstorage.IsValidBucket(bucket) == false { @@ -110,8 +108,8 @@ func (storage *storage) StoreBucket(bucket string) error { } func (storage *storage) GetBucketPolicy(bucket string) (interface{}, error) { - storage.writeLock.Lock() - defer storage.writeLock.Unlock() + storage.lock.Lock() + defer storage.lock.Unlock() var p policy.BucketPolicy // verify bucket path legal @@ -153,8 +151,8 @@ func (storage *storage) GetBucketPolicy(bucket string) (interface{}, error) { } func (storage *storage) StoreBucketPolicy(bucket string, policy interface{}) error { - storage.writeLock.Lock() - defer storage.writeLock.Unlock() + storage.lock.Lock() + defer storage.lock.Unlock() // verify bucket path legal if mstorage.IsValidBucket(bucket) == false { @@ -332,8 +330,8 @@ func (storage *storage) ListObjects(bucket, prefix string, count int) ([]mstorag func (storage *storage) StoreObject(bucket, key, contentType string, data io.Reader) error { // TODO Commits should stage then move instead of writing directly - storage.writeLock.Lock() - defer storage.writeLock.Unlock() + storage.lock.Lock() + defer storage.lock.Unlock() // check bucket name valid if mstorage.IsValidBucket(bucket) == false {