Merge pull request #173 from harshavardhana/pr_out_make_channels_more_aware_of_errors_and_shutdown_on_error_some_cleanup

master
Harshavardhana 10 years ago
commit d8e0be7575
  1. 29
      pkg/server/server.go
  2. 38
      pkg/storage/fs/fs.go

@ -18,7 +18,6 @@ package server
import ( import (
"log" "log"
"os"
"path" "path"
"reflect" "reflect"
@ -129,14 +128,8 @@ func getStorageChannels(storageType StorageType) (ctrlChans []chan<- string, sta
case storageType == FileStorage: case storageType == FileStorage:
{ {
homeDir := helpers.HomeDir() homeDir := helpers.HomeDir()
rootPath := path.Join(homeDir, "minio-storage") root := path.Join(homeDir, "minio-storage")
_, err := os.Stat(rootPath) ctrlChan, statusChan, storage = fs.Start(root)
if os.IsNotExist(err) {
err = os.Mkdir(rootPath, 0700)
} else if err != nil {
log.Fatal("Could not create $HOME/minio-storage", err)
}
ctrlChan, statusChan, storage = fs.Start(rootPath)
ctrlChans = append(ctrlChans, ctrlChan) ctrlChans = append(ctrlChans, ctrlChan)
statusChans = append(statusChans, statusChan) statusChans = append(statusChans, statusChan)
} }
@ -147,17 +140,23 @@ func getStorageChannels(storageType StorageType) (ctrlChans []chan<- string, sta
} }
func Start(configs []ServerConfig) { func Start(configs []ServerConfig) {
// listen for critical errors
// TODO Handle critical errors appropriately when they arise
// reflected looping is necessary to remove dead channels from loop and not flood switch // reflected looping is necessary to remove dead channels from loop and not flood switch
_, statusChans := getHttpChannels(configs) ctrlChans, statusChans := getHttpChannels(configs)
cases := createSelectCases(statusChans) cases := createSelectCases(statusChans)
for len(cases) > 0 { for len(cases) > 0 {
chosen, value, recvOk := reflect.Select(cases) chosen, value, recvOk := reflect.Select(cases)
if recvOk == true { switch recvOk {
case true:
// Status Message Received // Status Message Received
log.Println(chosen, value.Interface(), recvOk) switch true {
} else { case value.Interface() != nil:
// For any error received cleanup all existing channels and fail
for _, ch := range ctrlChans {
close(ch)
}
log.Fatal(value.Interface())
}
case false:
// Channel closed, remove from list // Channel closed, remove from list
var aliveStatusChans []<-chan error var aliveStatusChans []<-chan error
for i, ch := range statusChans { for i, ch := range statusChans {

@ -31,33 +31,31 @@ import (
) )
type storage struct { type storage struct {
root string root string
writeLock sync.Mutex lock *sync.Mutex
} }
type SerializedMetadata struct { type SerializedMetadata struct {
ContentType string ContentType string
} }
type MkdirFailedError struct{}
func (self MkdirFailedError) Error() string {
return "Mkdir Failed"
}
func Start(root string) (chan<- string, <-chan error, *storage) { func Start(root string) (chan<- string, <-chan error, *storage) {
ctrlChannel := make(chan string) ctrlChannel := make(chan string)
errorChannel := make(chan error) errorChannel := make(chan error)
go start(ctrlChannel, errorChannel) s := storage{}
return ctrlChannel, errorChannel, &storage{root: root} s.root = root
s.lock = new(sync.Mutex)
go start(ctrlChannel, errorChannel, &s)
return ctrlChannel, errorChannel, &s
} }
func start(ctrlChannel <-chan string, errorChannel chan<- error) { func start(ctrlChannel <-chan string, errorChannel chan<- error, s *storage) {
err := os.MkdirAll(s.root, 0700)
errorChannel <- err
close(errorChannel) close(errorChannel)
} }
// Bucket Operations // Bucket Operations
func (storage *storage) ListBuckets() ([]mstorage.BucketMetadata, error) { func (storage *storage) ListBuckets() ([]mstorage.BucketMetadata, error) {
files, err := ioutil.ReadDir(storage.root) files, err := ioutil.ReadDir(storage.root)
if err != nil { if err != nil {
@ -83,8 +81,8 @@ func (storage *storage) ListBuckets() ([]mstorage.BucketMetadata, error) {
} }
func (storage *storage) StoreBucket(bucket string) error { func (storage *storage) StoreBucket(bucket string) error {
storage.writeLock.Lock() storage.lock.Lock()
defer storage.writeLock.Unlock() defer storage.lock.Unlock()
// verify bucket path legal // verify bucket path legal
if mstorage.IsValidBucket(bucket) == false { if mstorage.IsValidBucket(bucket) == false {
@ -110,8 +108,8 @@ func (storage *storage) StoreBucket(bucket string) error {
} }
func (storage *storage) GetBucketPolicy(bucket string) (interface{}, error) { func (storage *storage) GetBucketPolicy(bucket string) (interface{}, error) {
storage.writeLock.Lock() storage.lock.Lock()
defer storage.writeLock.Unlock() defer storage.lock.Unlock()
var p policy.BucketPolicy var p policy.BucketPolicy
// verify bucket path legal // verify bucket path legal
@ -153,8 +151,8 @@ func (storage *storage) GetBucketPolicy(bucket string) (interface{}, error) {
} }
func (storage *storage) StoreBucketPolicy(bucket string, policy interface{}) error { func (storage *storage) StoreBucketPolicy(bucket string, policy interface{}) error {
storage.writeLock.Lock() storage.lock.Lock()
defer storage.writeLock.Unlock() defer storage.lock.Unlock()
// verify bucket path legal // verify bucket path legal
if mstorage.IsValidBucket(bucket) == false { if mstorage.IsValidBucket(bucket) == false {
@ -332,8 +330,8 @@ func (storage *storage) ListObjects(bucket, prefix string, count int) ([]mstorag
func (storage *storage) StoreObject(bucket, key, contentType string, data io.Reader) error { func (storage *storage) StoreObject(bucket, key, contentType string, data io.Reader) error {
// TODO Commits should stage then move instead of writing directly // TODO Commits should stage then move instead of writing directly
storage.writeLock.Lock() storage.lock.Lock()
defer storage.writeLock.Unlock() defer storage.lock.Unlock()
// check bucket name valid // check bucket name valid
if mstorage.IsValidBucket(bucket) == false { if mstorage.IsValidBucket(bucket) == false {

Loading…
Cancel
Save