Use errorChannels only for services not for drivers, reduce them to use simple functions

master
Harshavardhana 10 years ago
parent b2bf90afbd
commit 42c0287943
  1. 6
      pkg/api/api_test.go
  2. 23
      pkg/server/httpserver/httpserver.go
  3. 9
      pkg/server/server.go
  4. 24
      pkg/storage/drivers/donut/donut.go
  5. 3
      pkg/storage/drivers/donut/donut_test.go
  6. 14
      pkg/storage/drivers/fs/fs.go
  7. 3
      pkg/storage/drivers/fs/fs_test.go
  8. 18
      pkg/storage/drivers/memory/memory.go
  9. 3
      pkg/storage/drivers/memory/memory_test.go

@ -59,7 +59,7 @@ var _ = Suite(&MySuite{
var _ = Suite(&MySuite{ var _ = Suite(&MySuite{
initDriver: func() (drivers.Driver, string) { initDriver: func() (drivers.Driver, string) {
_, _, driver := memory.Start(10000, 3*time.Hour) driver, _ := memory.NewDriver(10000, 3*time.Hour)
return driver, "" return driver, ""
}, },
}) })
@ -69,7 +69,7 @@ var _ = Suite(&MySuite{
root, _ := ioutil.TempDir(os.TempDir(), "minio-api") root, _ := ioutil.TempDir(os.TempDir(), "minio-api")
var roots []string var roots []string
roots = append(roots, root) roots = append(roots, root)
_, _, driver := donut.Start(roots, 10000, 3*time.Hour) driver, _ := donut.NewDriver(roots, 10000, 3*time.Hour)
return driver, root return driver, root
}, },
}) })
@ -77,7 +77,7 @@ var _ = Suite(&MySuite{
var _ = Suite(&MySuite{ var _ = Suite(&MySuite{
initDriver: func() (drivers.Driver, string) { initDriver: func() (drivers.Driver, string) {
root, _ := ioutil.TempDir(os.TempDir(), "minio-fs-api") root, _ := ioutil.TempDir(os.TempDir(), "minio-fs-api")
_, _, driver := filesystem.Start(root) driver, _ := filesystem.NewDriver(root)
return driver, root return driver, root
}, },
}) })

@ -44,10 +44,10 @@ func Start(handler http.Handler, config Config) (chan<- string, <-chan error, *S
return ctrlChannel, errorChannel, &server return ctrlChannel, errorChannel, &server
} }
func start(ctrlChannel <-chan string, errorChannel chan<- error, func start(ctrlChannel <-chan string, errorChannel chan<- error, router http.Handler, config Config, server *Server) {
router http.Handler, config Config, server *Server) { defer close(errorChannel)
var err error
var err error
// Minio server config // Minio server config
httpServer := &http.Server{ httpServer := &http.Server{
Addr: config.Address, Addr: config.Address,
@ -56,7 +56,10 @@ func start(ctrlChannel <-chan string, errorChannel chan<- error,
} }
host, port, err := net.SplitHostPort(config.Address) host, port, err := net.SplitHostPort(config.Address)
errorChannel <- err if err != nil {
errorChannel <- err
return
}
var hosts []string var hosts []string
switch { switch {
@ -67,9 +70,9 @@ func start(ctrlChannel <-chan string, errorChannel chan<- error,
errorChannel <- err errorChannel <- err
for _, addr := range addrs { for _, addr := range addrs {
if addr.Network() == "ip+net" { if addr.Network() == "ip+net" {
h := strings.Split(addr.String(), "/")[0] host := strings.Split(addr.String(), "/")[0]
if ip := net.ParseIP(h); ip.To4() != nil { if ip := net.ParseIP(host); ip.To4() != nil {
hosts = append(hosts, h) hosts = append(hosts, host)
} }
} }
} }
@ -86,6 +89,8 @@ func start(ctrlChannel <-chan string, errorChannel chan<- error,
} }
err = httpServer.ListenAndServeTLS(config.CertFile, config.KeyFile) err = httpServer.ListenAndServeTLS(config.CertFile, config.KeyFile)
} }
errorChannel <- err if err != nil {
close(errorChannel) errorChannel <- err
}
} }

@ -42,7 +42,7 @@ type MemoryFactory struct {
// GetStartServerFunc builds memory api server // GetStartServerFunc builds memory api server
func (f MemoryFactory) GetStartServerFunc() StartServerFunc { func (f MemoryFactory) GetStartServerFunc() StartServerFunc {
return func() (chan<- string, <-chan error) { return func() (chan<- string, <-chan error) {
_, _, driver := memory.Start(f.MaxMemory, f.Expiration) driver, _ := memory.NewDriver(f.MaxMemory, f.Expiration)
conf := api.Config{RateLimit: f.RateLimit} conf := api.Config{RateLimit: f.RateLimit}
conf.SetDriver(driver) conf.SetDriver(driver)
ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config) ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config)
@ -59,7 +59,7 @@ type FilesystemFactory struct {
// GetStartServerFunc builds memory api server // GetStartServerFunc builds memory api server
func (f FilesystemFactory) GetStartServerFunc() StartServerFunc { func (f FilesystemFactory) GetStartServerFunc() StartServerFunc {
return func() (chan<- string, <-chan error) { return func() (chan<- string, <-chan error) {
_, _, driver := fs.Start(f.Path) driver, _ := fs.NewDriver(f.Path)
conf := api.Config{RateLimit: f.RateLimit} conf := api.Config{RateLimit: f.RateLimit}
conf.SetDriver(driver) conf.SetDriver(driver)
ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config) ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config)
@ -91,7 +91,10 @@ type DonutFactory struct {
// GetStartServerFunc DonutFactory builds donut api server // GetStartServerFunc DonutFactory builds donut api server
func (f DonutFactory) GetStartServerFunc() StartServerFunc { func (f DonutFactory) GetStartServerFunc() StartServerFunc {
return func() (chan<- string, <-chan error) { return func() (chan<- string, <-chan error) {
_, _, driver := donut.Start(f.Paths, f.MaxMemory, f.Expiration) driver, err := donut.NewDriver(f.Paths, f.MaxMemory, f.Expiration)
if err != nil {
log.Fatalln(err)
}
conf := api.Config{RateLimit: f.RateLimit} conf := api.Config{RateLimit: f.RateLimit}
conf.SetDriver(driver) conf.SetDriver(driver)
ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config) ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config)

@ -101,18 +101,18 @@ func createNodeDiskMap(paths []string) map[string][]string {
return nodes return nodes
} }
func initialize(d *donutDriver) { func initialize(d *donutDriver) error {
// Soon to be user configurable, when Management API is available // Soon to be user configurable, when Management API is available
// we should remove "default" to something which is passed down // we should remove "default" to something which is passed down
// from configuration paramters // from configuration paramters
var err error var err error
d.donut, err = donut.NewDonut("default", createNodeDiskMap(d.paths)) d.donut, err = donut.NewDonut("default", createNodeDiskMap(d.paths))
if err != nil { if err != nil {
panic(iodine.New(err, nil)) return iodine.New(err, nil)
} }
buckets, err := d.donut.ListBuckets() buckets, err := d.donut.ListBuckets()
if err != nil { if err != nil {
panic(iodine.New(err, nil)) return iodine.New(err, nil)
} }
for bucketName, metadata := range buckets { for bucketName, metadata := range buckets {
d.lock.RLock() d.lock.RLock()
@ -136,13 +136,11 @@ func initialize(d *donutDriver) {
d.storedBuckets[bucketName] = storedBucket d.storedBuckets[bucketName] = storedBucket
d.lock.Unlock() d.lock.Unlock()
} }
return nil
} }
// Start a single disk subsystem // NewDriver instantiate a donut driver
func Start(paths []string, maxSize uint64, expiration time.Duration) (chan<- string, <-chan error, drivers.Driver) { func NewDriver(paths []string, maxSize uint64, expiration time.Duration) (drivers.Driver, error) {
ctrlChannel := make(chan string)
errorChannel := make(chan error)
driver := new(donutDriver) driver := new(donutDriver)
driver.storedBuckets = make(map[string]storedBucket) driver.storedBuckets = make(map[string]storedBucket)
driver.objects = trove.NewCache(maxSize, expiration) driver.objects = trove.NewCache(maxSize, expiration)
@ -160,14 +158,8 @@ func Start(paths []string, maxSize uint64, expiration time.Duration) (chan<- str
driver.paths = paths driver.paths = paths
driver.lock = new(sync.RWMutex) driver.lock = new(sync.RWMutex)
initialize(driver) err := initialize(driver)
return driver, err
go start(ctrlChannel, errorChannel, driver)
return ctrlChannel, errorChannel, driver
}
func start(ctrlChannel <-chan string, errorChannel chan<- error, driver *donutDriver) {
defer close(errorChannel)
} }
func (d donutDriver) expiredObject(a ...interface{}) { func (d donutDriver) expiredObject(a ...interface{}) {

@ -40,7 +40,8 @@ func (s *MySuite) TestAPISuite(c *C) {
c.Check(err, IsNil) c.Check(err, IsNil)
storageList = append(storageList, p) storageList = append(storageList, p)
paths = append(paths, p) paths = append(paths, p)
_, _, store := Start(paths, 1000000, 3*time.Hour) store, err := NewDriver(paths, 1000000, 3*time.Hour)
c.Check(err, IsNil)
return store return store
} }
drivers.APITestSuite(c, create) drivers.APITestSuite(c, create)

@ -29,22 +29,14 @@ type fsDriver struct {
multiparts *Multiparts multiparts *Multiparts
} }
// Start filesystem channel // NewDriver instantiate a new filesystem driver
func Start(root string) (chan<- string, <-chan error, drivers.Driver) { func NewDriver(root string) (drivers.Driver, error) {
ctrlChannel := make(chan string)
errorChannel := make(chan error)
fs := new(fsDriver) fs := new(fsDriver)
fs.root = root fs.root = root
fs.lock = new(sync.Mutex) fs.lock = new(sync.Mutex)
// internal related to multiparts // internal related to multiparts
fs.multiparts = new(Multiparts) fs.multiparts = new(Multiparts)
fs.multiparts.ActiveSession = make(map[string]*MultipartSession) fs.multiparts.ActiveSession = make(map[string]*MultipartSession)
go start(ctrlChannel, errorChannel, fs)
return ctrlChannel, errorChannel, fs
}
func start(ctrlChannel <-chan string, errorChannel chan<- error, fs *fsDriver) {
err := os.MkdirAll(fs.root, 0700) err := os.MkdirAll(fs.root, 0700)
errorChannel <- err return fs, err
close(errorChannel)
} }

@ -38,7 +38,8 @@ func (s *MySuite) TestAPISuite(c *C) {
path, err := ioutil.TempDir(os.TempDir(), "minio-fs-") path, err := ioutil.TempDir(os.TempDir(), "minio-fs-")
c.Check(err, IsNil) c.Check(err, IsNil)
storageList = append(storageList, path) storageList = append(storageList, path)
_, _, store := Start(path) store, err := NewDriver(path)
c.Check(err, IsNil)
return store return store
} }
drivers.APITestSuite(c, create) drivers.APITestSuite(c, create)

@ -64,13 +64,9 @@ const (
totalBuckets = 100 totalBuckets = 100
) )
// Start memory object server // NewDriver instantiate a new memory driver
func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan error, drivers.Driver) { func NewDriver(maxSize uint64, expiration time.Duration) (drivers.Driver, error) {
ctrlChannel := make(chan string) memory := new(memoryDriver)
errorChannel := make(chan error)
var memory *memoryDriver
memory = new(memoryDriver)
memory.storedBuckets = make(map[string]storedBucket) memory.storedBuckets = make(map[string]storedBucket)
memory.objects = trove.NewCache(maxSize, expiration) memory.objects = trove.NewCache(maxSize, expiration)
memory.maxSize = maxSize memory.maxSize = maxSize
@ -83,13 +79,7 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro
// set up memory expiration // set up memory expiration
memory.objects.ExpireObjects(time.Second * 5) memory.objects.ExpireObjects(time.Second * 5)
return memory, nil
go start(ctrlChannel, errorChannel)
return ctrlChannel, errorChannel, memory
}
func start(ctrlChannel <-chan string, errorChannel chan<- error) {
close(errorChannel)
} }
// GetObject - GET object from memory buffer // GetObject - GET object from memory buffer

@ -32,7 +32,8 @@ var _ = Suite(&MySuite{})
func (s *MySuite) TestAPISuite(c *C) { func (s *MySuite) TestAPISuite(c *C) {
create := func() drivers.Driver { create := func() drivers.Driver {
_, _, store := Start(1000000, 3*time.Hour) store, err := NewDriver(1000000, 3*time.Hour)
c.Check(err, IsNil)
return store return store
} }
drivers.APITestSuite(c, create) drivers.APITestSuite(c, create)

Loading…
Cancel
Save