From 32c3a558e9e14ed4475c34c189329ca7dcd04261 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Wed, 19 Oct 2016 01:19:24 +0530 Subject: [PATCH] distributed-XL: Support to run one minio process per export even on the same machine. (#2999) fixes #2983 --- cmd/benchmark-utils_test.go | 6 +- cmd/bucket-notification-handlers_test.go | 2 +- cmd/bucket-notification-utils.go | 2 +- cmd/control-mains_test.go | 2 +- cmd/control-router.go | 28 ++-- cmd/control-router_test.go | 20 +-- cmd/erasure-readfile_test.go | 6 +- cmd/event-notifier_test.go | 42 +++-- cmd/format-config-v1_test.go | 126 ++++++++++++--- cmd/fs-v1-metadata_test.go | 6 +- cmd/fs-v1_test.go | 14 +- cmd/globals.go | 2 + cmd/lock-rpc-server.go | 26 +-- cmd/lock-rpc-server_test.go | 24 +-- cmd/namespace-lock.go | 35 ++-- cmd/object-api-listobjects_test.go | 7 +- cmd/object-common.go | 86 +++++----- cmd/prepare-storage.go | 30 ++-- cmd/s3-peer-client.go | 25 +-- cmd/server-main.go | 196 +++++++++++++++++------ cmd/server-main_test.go | 114 ++++++------- cmd/storage-rpc-client.go | 22 +-- cmd/storage-rpc-client_test.go | 16 +- cmd/storage-rpc-server.go | 30 +--- cmd/test-utils_test.go | 74 ++++++--- cmd/tree-walk_test.go | 60 +++++-- cmd/utils.go | 21 ++- cmd/utils_test.go | 42 ++--- cmd/xl-v1_test.go | 30 +++- 29 files changed, 683 insertions(+), 411 deletions(-) diff --git a/cmd/benchmark-utils_test.go b/cmd/benchmark-utils_test.go index baf08e5e2..19f2eca5c 100644 --- a/cmd/benchmark-utils_test.go +++ b/cmd/benchmark-utils_test.go @@ -35,7 +35,11 @@ func prepareBenchmarkBackend(instanceType string) (ObjectLayer, []string, error) if err != nil { return nil, nil, err } - obj, _, err := initObjectLayer(disks, nil) + endpoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + return nil, nil, err + } + obj, _, err := initObjectLayer(endpoints, nil) if err != nil { return nil, nil, err } diff --git a/cmd/bucket-notification-handlers_test.go b/cmd/bucket-notification-handlers_test.go index cb1abb13f..9b9d3696a 100644 --- a/cmd/bucket-notification-handlers_test.go +++ b/cmd/bucket-notification-handlers_test.go @@ -197,7 +197,7 @@ func testListenBucketNotificationHandler(obj ObjectLayer, instanceType string, t } globalMinioAddr = fmt.Sprintf(":%d", globalMinioPort) // initialize the peer client(s) - initGlobalS3Peers([]string{}) + initGlobalS3Peers([]storageEndPoint{}) invalidBucket := "Invalid\\Bucket" noNotificationBucket := "nonotificationbucket" diff --git a/cmd/bucket-notification-utils.go b/cmd/bucket-notification-utils.go index fa2e6341c..3b02f23aa 100644 --- a/cmd/bucket-notification-utils.go +++ b/cmd/bucket-notification-utils.go @@ -256,7 +256,7 @@ func checkDuplicateQueueConfigs(configs []queueConfig) APIErrorCode { } // Check if there are any duplicate counts. - if err := checkDuplicates(queueConfigARNS); err != nil { + if err := checkDuplicateStrings(queueConfigARNS); err != nil { errorIf(err, "Invalid queue configs found.") return ErrOverlappingConfigs } diff --git a/cmd/control-mains_test.go b/cmd/control-mains_test.go index 54c017831..346fd4a8f 100644 --- a/cmd/control-mains_test.go +++ b/cmd/control-mains_test.go @@ -73,7 +73,7 @@ func TestControlHealMain(t *testing.T) { } // Remove the object - to simulate the case where the disk was down when the object was created. - err = os.RemoveAll(path.Join(testServer.Disks[0], bucket, object)) + err = os.RemoveAll(path.Join(testServer.Disks[0].path, bucket, object)) if err != nil { t.Fatal(err) } diff --git a/cmd/control-router.go b/cmd/control-router.go index 748254b8a..76b057799 100644 --- a/cmd/control-router.go +++ b/cmd/control-router.go @@ -20,10 +20,8 @@ import ( "fmt" "net/rpc" "path" - "strings" router "github.com/gorilla/mux" - "github.com/minio/minio-go/pkg/set" ) // Routes paths for "minio control" commands. @@ -36,32 +34,26 @@ func initRemoteControlClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient { if !srvCmdConfig.isDistXL { return nil } - var newExports []string // Initialize auth rpc clients. - exports := srvCmdConfig.disks - remoteHosts := set.NewStringSet() - var remoteControlClnts []*AuthRPCClient - for _, export := range exports { + localMap := make(map[storageEndPoint]int) + for _, ep := range srvCmdConfig.endPoints { + // Set path to "" so that it is not used for filtering the + // unique entries. + ep.path = "" // Validates if remote disk is local. - if isLocalStorage(export) { + if isLocalStorage(ep) { continue } - newExports = append(newExports, export) - } - for _, export := range newExports { - var host string - if idx := strings.LastIndex(export, ":"); idx != -1 { - host = export[:idx] + if localMap[ep] == 1 { + continue } - remoteHosts.Add(fmt.Sprintf("%s:%d", host, globalMinioPort)) - } - for host := range remoteHosts { + localMap[ep]++ remoteControlClnts = append(remoteControlClnts, newAuthClient(&authConfig{ accessKey: serverConfig.GetCredential().AccessKeyID, secretKey: serverConfig.GetCredential().SecretAccessKey, secureConn: isSSL(), - address: host, + address: fmt.Sprintf("%s:%d", ep.host, ep.port), path: path.Join(reservedBucket, controlPath), loginMethod: "Control.LoginHandler", })) diff --git a/cmd/control-router_test.go b/cmd/control-router_test.go index 8520acdc4..c7282d69d 100644 --- a/cmd/control-router_test.go +++ b/cmd/control-router_test.go @@ -41,11 +41,11 @@ func TestInitRemoteControlClients(t *testing.T) { { srvCmdConfig: serverCmdConfig{ isDistXL: true, - disks: []string{ - "10.1.10.1:/mnt/disk1", - "10.1.10.1:/mnt/disk2", - "10.1.10.2:/mnt/disk3", - "10.1.10.2:/mnt/disk4", + endPoints: []storageEndPoint{ + {"10.1.10.1", 9000, "/mnt/disk1"}, + {"10.1.10.1", 9000, "/mnt/disk2"}, + {"10.1.10.2", 9000, "/mnt/disk1"}, + {"10.1.10.2", 9000, "/mnt/disk2"}, }, }, totalClients: 2, @@ -54,11 +54,11 @@ func TestInitRemoteControlClients(t *testing.T) { { srvCmdConfig: serverCmdConfig{ isDistXL: true, - disks: []string{ - "10.1.10.1:/mnt/disk1", - "10.1.10.2:/mnt/disk2", - "10.1.10.3:/mnt/disk3", - "10.1.10.4:/mnt/disk4", + endPoints: []storageEndPoint{ + {"10.1.10.1", 9000, "/mnt/disk1"}, + {"10.1.10.2", 9000, "/mnt/disk2"}, + {"10.1.10.3", 9000, "/mnt/disk3"}, + {"10.1.10.4", 9000, "/mnt/disk4"}, }, }, totalClients: 4, diff --git a/cmd/erasure-readfile_test.go b/cmd/erasure-readfile_test.go index 37a2ed83a..6bfd3d9d2 100644 --- a/cmd/erasure-readfile_test.go +++ b/cmd/erasure-readfile_test.go @@ -222,7 +222,11 @@ func TestErasureReadUtils(t *testing.T) { if err != nil { t.Fatal(err) } - objLayer, _, err := initObjectLayer(disks, nil) + endpoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatal(err) + } + objLayer, _, err := initObjectLayer(endpoints, nil) if err != nil { removeRoots(disks) t.Fatal(err) diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index 7f17d7d6b..b88639171 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -35,12 +35,16 @@ func TestInitEventNotifierFaultyDisks(t *testing.T) { // remove the root folder after the test ends. defer removeAll(rootPath) - disk, err := getRandomDisks(1) + disks, err := getRandomDisks(1) if err != nil { t.Fatal("Unable to create directories for FS backend. ", err) } - defer removeAll(disk[0]) - obj, _, err := initObjectLayer(disk, nil) + defer removeAll(disks[0]) + endpoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatal(err) + } + obj, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Fatal("Unable to initialize FS backend.", err) } @@ -85,12 +89,16 @@ func TestInitEventNotifierWithAMQP(t *testing.T) { // remove the root folder after the test ends. defer removeAll(rootPath) - disk, err := getRandomDisks(1) - defer removeAll(disk[0]) + disks, err := getRandomDisks(1) + defer removeAll(disks[0]) if err != nil { t.Fatal("Unable to create directories for FS backend. ", err) } - fs, _, err := initObjectLayer(disk, nil) + endpoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatal(err) + } + fs, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Fatal("Unable to initialize FS backend.", err) } @@ -112,12 +120,16 @@ func TestInitEventNotifierWithElasticSearch(t *testing.T) { // remove the root folder after the test ends. defer removeAll(rootPath) - disk, err := getRandomDisks(1) - defer removeAll(disk[0]) + disks, err := getRandomDisks(1) + defer removeAll(disks[0]) if err != nil { t.Fatal("Unable to create directories for FS backend. ", err) } - fs, _, err := initObjectLayer(disk, nil) + endpoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatal(err) + } + fs, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Fatal("Unable to initialize FS backend.", err) } @@ -139,12 +151,16 @@ func TestInitEventNotifierWithRedis(t *testing.T) { // remove the root folder after the test ends. defer removeAll(rootPath) - disk, err := getRandomDisks(1) - defer removeAll(disk[0]) + disks, err := getRandomDisks(1) + defer removeAll(disks[0]) if err != nil { t.Fatal("Unable to create directories for FS backend. ", err) } - fs, _, err := initObjectLayer(disk, nil) + endpoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatal(err) + } + fs, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Fatal("Unable to initialize FS backend.", err) } @@ -184,7 +200,7 @@ func (s *TestPeerRPCServerData) TearDown() { s.testServer.Stop() _ = removeAll(s.testServer.Root) for _, d := range s.testServer.Disks { - _ = removeAll(d) + _ = removeAll(d.path) } } diff --git a/cmd/format-config-v1_test.go b/cmd/format-config-v1_test.go index b1cfbf482..dd0b4183c 100644 --- a/cmd/format-config-v1_test.go +++ b/cmd/format-config-v1_test.go @@ -275,8 +275,12 @@ func TestFormatXLHealFreshDisks(t *testing.T) { if err != nil { t.Fatal(err) } + endpoints, err := parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } // Create an instance of xl backend. - obj, _, err := initObjectLayer(fsDirs, nil) + obj, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Error(err) } @@ -307,8 +311,12 @@ func TestFormatXLHealFreshDisksErrorExpected(t *testing.T) { if err != nil { t.Fatal(err) } + endpoints, err := parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } // Create an instance of xl backend. - obj, _, err := initObjectLayer(fsDirs, nil) + obj, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Error(err) } @@ -592,8 +600,12 @@ func TestInitFormatXLErrors(t *testing.T) { t.Fatal(err) } defer removeRoots(fsDirs) + endpoints, err := parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } // Create an instance of xl backend. - obj, _, err := initObjectLayer(fsDirs, nil) + obj, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -694,8 +706,12 @@ func TestLoadFormatXLErrs(t *testing.T) { } defer removeRoots(fsDirs) + endpoints, err := parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } // Create an instance of xl backend. - obj, _, err := initObjectLayer(fsDirs, nil) + obj, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -721,7 +737,11 @@ func TestLoadFormatXLErrs(t *testing.T) { } defer removeRoots(fsDirs) - obj, _, err = initObjectLayer(fsDirs, nil) + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -745,7 +765,11 @@ func TestLoadFormatXLErrs(t *testing.T) { } defer removeRoots(fsDirs) - obj, _, err = initObjectLayer(fsDirs, nil) + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -767,7 +791,11 @@ func TestLoadFormatXLErrs(t *testing.T) { } defer removeRoots(fsDirs) - obj, _, err = initObjectLayer(fsDirs, nil) + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -790,8 +818,13 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err := parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // Everything is fine, should return nil - obj, _, err := initObjectLayer(fsDirs, nil) + obj, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -807,8 +840,13 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // Disks 0..15 are nil - obj, _, err = initObjectLayer(fsDirs, nil) + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -826,8 +864,13 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // One disk returns Faulty Disk - obj, _, err = initObjectLayer(fsDirs, nil) + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -847,8 +890,13 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // One disk is not found, heal corrupted disks should return nil - obj, _, err = initObjectLayer(fsDirs, nil) + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -864,8 +912,13 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // Remove format.json of all disks - obj, _, err = initObjectLayer(fsDirs, nil) + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -885,8 +938,13 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // Corrupted format json in one disk - obj, _, err = initObjectLayer(fsDirs, nil) + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -910,8 +968,13 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err := parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // Everything is fine, should return nil - obj, _, err := initObjectLayer(fsDirs, nil) + obj, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -926,8 +989,13 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // Disks 0..15 are nil - obj, _, err = initObjectLayer(fsDirs, nil) + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -945,8 +1013,13 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // One disk returns Faulty Disk - obj, _, err = initObjectLayer(fsDirs, nil) + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -966,8 +1039,13 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // One disk is not found, heal corrupted disks should return nil - obj, _, err = initObjectLayer(fsDirs, nil) + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -983,8 +1061,13 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // Remove format.json of all disks - obj, _, err = initObjectLayer(fsDirs, nil) + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } @@ -1004,8 +1087,13 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) { t.Fatal(err) } + endpoints, err = parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal(err) + } + // Remove format.json of all disks - obj, _, err = initObjectLayer(fsDirs, nil) + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal(err) } diff --git a/cmd/fs-v1-metadata_test.go b/cmd/fs-v1-metadata_test.go index 97859068f..bc9cf726c 100644 --- a/cmd/fs-v1-metadata_test.go +++ b/cmd/fs-v1-metadata_test.go @@ -68,7 +68,11 @@ func TestHasExtendedHeader(t *testing.T) { } func initFSObjects(disk string, t *testing.T) (obj ObjectLayer) { - obj, _, err := initObjectLayer([]string{disk}, nil) + endpoints, err := parseStorageEndPoints([]string{disk}, 0) + if err != nil { + t.Fatal(err) + } + obj, _, err = initObjectLayer(endpoints, nil) if err != nil { t.Fatal("Unexpected err: ", err) } diff --git a/cmd/fs-v1_test.go b/cmd/fs-v1_test.go index 1fe73bbda..a03e7e45d 100644 --- a/cmd/fs-v1_test.go +++ b/cmd/fs-v1_test.go @@ -40,12 +40,22 @@ func TestNewFS(t *testing.T) { disks = append(disks, xlDisk) } - fsStorageDisks, err := initStorageDisks([]string{disk}, nil) + endpoints, err := parseStorageEndPoints([]string{disk}, 0) if err != nil { t.Fatal("Uexpected error: ", err) } - xlStorageDisks, err := initStorageDisks(disks, nil) + fsStorageDisks, err := initStorageDisks(endpoints, nil) + if err != nil { + t.Fatal("Uexpected error: ", err) + } + + endpoints, err = parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatal("Uexpected error: ", err) + } + + xlStorageDisks, err := initStorageDisks(endpoints, nil) if err != nil { t.Fatal("Uexpected error: ", err) } diff --git a/cmd/globals.go b/cmd/globals.go index 2e03a0680..46f0929dc 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -57,6 +57,8 @@ var ( globalMinioAddr = "" // Minio default port, can be changed through command line. globalMinioPort = 9000 + // Holds the host that was passed using --address + globalMinioHost = "" // Peer communication struct globalS3Peers = s3Peers{} diff --git a/cmd/lock-rpc-server.go b/cmd/lock-rpc-server.go index fbe49c710..a1aacdedf 100644 --- a/cmd/lock-rpc-server.go +++ b/cmd/lock-rpc-server.go @@ -21,12 +21,10 @@ import ( "math/rand" "net/rpc" "path" - "strings" "sync" "time" router "github.com/gorilla/mux" - "github.com/minio/minio-go/pkg/set" ) const lockRPCPath = "/minio/lock" @@ -84,31 +82,19 @@ func registerDistNSLockRouter(mux *router.Router, serverConfig serverCmdConfig) // Create one lock server for every local storage rpc server. func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) { - // Initialize posix storage API. - exports := serverConfig.disks - ignoredExports := serverConfig.ignoredDisks - - // Save ignored disks in a map - // Initialize ignored disks in a new set. - ignoredSet := set.NewStringSet() - if len(ignoredExports) > 0 { - ignoredSet = set.CreateStringSet(ignoredExports...) - } - for _, export := range exports { - if ignoredSet.Contains(export) { - // Ignore initializing ignored export. + for _, ep := range serverConfig.endPoints { + if ep.presentIn(serverConfig.ignoredEndPoints) { + // Skip initializing ignored end point. continue } + // Not local storage move to the next node. - if !isLocalStorage(export) { + if !isLocalStorage(ep) { continue } - if idx := strings.LastIndex(export, ":"); idx != -1 { - export = export[idx+1:] - } // Create handler for lock RPCs locker := &lockServer{ - rpcPath: export, + rpcPath: ep.path, mutex: sync.Mutex{}, lockMap: make(map[string][]lockRequesterInfo), } diff --git a/cmd/lock-rpc-server_test.go b/cmd/lock-rpc-server_test.go index 0215c3500..0aab1acbd 100644 --- a/cmd/lock-rpc-server_test.go +++ b/cmd/lock-rpc-server_test.go @@ -452,11 +452,11 @@ func TestLockServers(t *testing.T) { { srvCmdConfig: serverCmdConfig{ isDistXL: true, - disks: []string{ - "localhost:/mnt/disk1", - "1.1.1.2:/mnt/disk2", - "1.1.2.1:/mnt/disk3", - "1.1.2.2:/mnt/disk4", + endPoints: []storageEndPoint{ + {"localhost", 9000, "/mnt/disk1"}, + {"1.1.1.2", 9000, "/mnt/disk2"}, + {"1.1.2.1", 9000, "/mnt/disk3"}, + {"1.1.2.2", 9000, "/mnt/disk4"}, }, }, totalLockServers: 1, @@ -465,14 +465,14 @@ func TestLockServers(t *testing.T) { { srvCmdConfig: serverCmdConfig{ isDistXL: true, - disks: []string{ - "localhost:/mnt/disk1", - "localhost:/mnt/disk2", - "1.1.2.1:/mnt/disk3", - "1.1.2.2:/mnt/disk4", + endPoints: []storageEndPoint{ + {"localhost", 9000, "/mnt/disk1"}, + {"localhost", 9000, "/mnt/disk2"}, + {"1.1.2.1", 9000, "/mnt/disk3"}, + {"1.1.2.2", 9000, "/mnt/disk4"}, }, - ignoredDisks: []string{ - "localhost:/mnt/disk2", + ignoredEndPoints: []storageEndPoint{ + {"localhost", 9000, "/mnt/disk2"}, }, }, totalLockServers: 1, diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index f2c6b2c8c..50f7e5e1d 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -21,7 +21,6 @@ import ( pathutil "path" "runtime" "strconv" - "strings" "sync" "github.com/minio/dsync" @@ -32,27 +31,27 @@ var nsMutex *nsLockMap // Initialize distributed locking only in case of distributed setup. // Returns if the setup is distributed or not on success. -func initDsyncNodes(disks []string, port int) error { - serverPort := strconv.Itoa(port) +func initDsyncNodes(eps []storageEndPoint) error { cred := serverConfig.GetCredential() // Initialize rpc lock client information only if this instance is a distributed setup. var clnts []dsync.RPC myNode := -1 - for _, disk := range disks { - if idx := strings.LastIndex(disk, ":"); idx != -1 { - clnts = append(clnts, newAuthClient(&authConfig{ - accessKey: cred.AccessKeyID, - secretKey: cred.SecretAccessKey, - // Construct a new dsync server addr. - secureConn: isSSL(), - address: disk[:idx] + ":" + serverPort, - // Construct a new rpc path for the disk. - path: pathutil.Join(lockRPCPath, disk[idx+1:]), - loginMethod: "Dsync.LoginHandler", - })) - if isLocalStorage(disk) && myNode == -1 { - myNode = len(clnts) - 1 - } + for _, ep := range eps { + if ep.host == "" || ep.port == 0 || ep.path == "" { + return errInvalidArgument + } + clnts = append(clnts, newAuthClient(&authConfig{ + accessKey: cred.AccessKeyID, + secretKey: cred.SecretAccessKey, + // Construct a new dsync server addr. + secureConn: isSSL(), + address: ep.host + ":" + strconv.Itoa(ep.port), + // Construct a new rpc path for the endpoint. + path: pathutil.Join(lockRPCPath, ep.path), + loginMethod: "Dsync.LoginHandler", + })) + if isLocalStorage(ep) && myNode == -1 { + myNode = len(clnts) - 1 } } return dsync.SetNodesWithClients(clnts, myNode) diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index 51cc1439d..e186b1b66 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -565,7 +565,12 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) { } func initFSObjectsB(disk string, t *testing.B) (obj ObjectLayer) { - storageDisks, err := initStorageDisks([]string{disk}, nil) + endPoints, err := parseStorageEndPoints([]string{disk}, 0) + if err != nil { + t.Fatal("Unexpected err: ", err) + } + + storageDisks, err := initStorageDisks(endPoints, nil) if err != nil { t.Fatal("Unexpected err: ", err) } diff --git a/cmd/object-common.go b/cmd/object-common.go index c043fa697..767e9892e 100644 --- a/cmd/object-common.go +++ b/cmd/object-common.go @@ -64,7 +64,11 @@ func houseKeeping(storageDisks []StorageAPI) error { // Initialize all disks in parallel. for index, disk := range storageDisks { - if disk == nil || !isLocalStorage(disk.String()) { + if disk == nil { + continue + } + if _, ok := disk.(*networkStorage); ok { + // Skip remote disks. continue } wg.Add(1) @@ -100,59 +104,57 @@ func houseKeeping(storageDisks []StorageAPI) error { } // Check if a network path is local to this node. -func isLocalStorage(networkPath string) bool { - if idx := strings.LastIndex(networkPath, ":"); idx != -1 { - // e.g 10.0.0.1:/mnt/networkPath - netAddr, _, err := splitNetPath(networkPath) - if err != nil { - errorIf(err, "Splitting into ip and path failed") - return false - } - // netAddr will only be set if this is not a local path. - if netAddr == "" { +func isLocalStorage(ep storageEndPoint) bool { + if ep.host == "" { + return true + } + if globalMinioHost != "" { + // if --address host:port was specified for distXL we short circuit only the endPoint + // that matches host:port + if globalMinioHost == ep.host && globalMinioPort == ep.port { return true } - // Resolve host to address to check if the IP is loopback. - // If address resolution fails, assume it's a non-local host. - addrs, err := net.LookupHost(netAddr) - if err != nil { - errorIf(err, "Failed to lookup host") - return false + return false + } + // Resolve host to address to check if the IP is loopback. + // If address resolution fails, assume it's a non-local host. + addrs, err := net.LookupHost(ep.host) + if err != nil { + errorIf(err, "Failed to lookup host") + return false + } + for _, addr := range addrs { + if ip := net.ParseIP(addr); ip.IsLoopback() { + return true } - for _, addr := range addrs { - if ip := net.ParseIP(addr); ip.IsLoopback() { + } + iaddrs, err := net.InterfaceAddrs() + if err != nil { + errorIf(err, "Unable to list interface addresses") + return false + } + for _, addr := range addrs { + for _, iaddr := range iaddrs { + ip, _, err := net.ParseCIDR(iaddr.String()) + if err != nil { + errorIf(err, "Unable to parse CIDR") + return false + } + if ip.String() == addr { return true } - } - iaddrs, err := net.InterfaceAddrs() - if err != nil { - errorIf(err, "Unable to list interface addresses") - return false - } - for _, addr := range addrs { - for _, iaddr := range iaddrs { - ip, _, err := net.ParseCIDR(iaddr.String()) - if err != nil { - errorIf(err, "Unable to parse CIDR") - return false - } - if ip.String() == addr { - return true - } - } } - return false } - return true + return false } // Depending on the disk type network or local, initialize storage API. -func newStorageAPI(disk string) (storage StorageAPI, err error) { - if isLocalStorage(disk) { - return newPosix(disk) +func newStorageAPI(ep storageEndPoint) (storage StorageAPI, err error) { + if isLocalStorage(ep) { + return newPosix(ep.path) } - return newRPCClient(disk) + return newRPCClient(ep) } // Initializes meta volume on all input storage disks. diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 5832908e2..134ac31d6 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -20,7 +20,6 @@ import ( "time" "github.com/minio/mc/pkg/console" - "github.com/minio/minio-go/pkg/set" ) // Channel where minioctl heal handler would notify if it were successful. This @@ -250,34 +249,35 @@ func retryFormattingDisks(firstDisk bool, firstEndpoint string, storageDisks []S } // Initialize storage disks based on input arguments. -func initStorageDisks(disks, ignoredDisks []string) ([]StorageAPI, error) { +func initStorageDisks(endPoints, ignoredEndPoints []storageEndPoint) ([]StorageAPI, error) { // Single disk means we will use FS backend. - if len(disks) == 1 { - storage, err := newStorageAPI(disks[0]) + if len(endPoints) == 1 { + storage, err := newStorageAPI(endPoints[0]) if err != nil && err != errDiskNotFound { return nil, err } return []StorageAPI{storage}, nil - } // Otherwise proceed with XL setup. - if err := checkSufficientDisks(disks); err != nil { - return nil, err - } - disksSet := set.NewStringSet() - if len(ignoredDisks) > 0 { - disksSet = set.CreateStringSet(ignoredDisks...) } + // Otherwise proceed with XL setup. // Bootstrap disks. - storageDisks := make([]StorageAPI, len(disks)) - for index, disk := range disks { + storageDisks := make([]StorageAPI, len(endPoints)) + for index, ep := range endPoints { // Check if disk is ignored. - if disksSet.Contains(disk) { + ignored := false + for _, iep := range ignoredEndPoints { + if ep == iep { + ignored = true + break + } + } + if ignored { // Set this situation as disk not found. storageDisks[index] = nil continue } // Intentionally ignore disk not found errors. XL is designed // to handle these errors internally. - storage, err := newStorageAPI(disk) + storage, err := newStorageAPI(ep) if err != nil && err != errDiskNotFound { return nil, err } diff --git a/cmd/s3-peer-client.go b/cmd/s3-peer-client.go index 51238c747..f950fc240 100644 --- a/cmd/s3-peer-client.go +++ b/cmd/s3-peer-client.go @@ -23,8 +23,6 @@ import ( "path" "sync" "time" - - "github.com/minio/minio-go/pkg/set" ) type s3Peers struct { @@ -38,9 +36,9 @@ type s3Peers struct { peers []string } -func initGlobalS3Peers(disks []string) { +func initGlobalS3Peers(eps []storageEndPoint) { // Get list of de-duplicated peers. - peers := getAllPeers(disks) + peers := getAllPeers(eps) // Initialize global state. globalS3Peers = s3Peers{ @@ -112,23 +110,10 @@ func (s3p *s3Peers) Close() error { // returns the network addresses of all Minio servers in the cluster // in `host:port` format. -func getAllPeers(disks []string) []string { +func getAllPeers(eps []storageEndPoint) []string { res := []string{} - // use set to de-duplicate - sset := set.NewStringSet() - for _, disk := range disks { - netAddr, _, err := splitNetPath(disk) - if err != nil || netAddr == "" { - errorIf(err, "Unexpected error - most likely a bug.") - continue - } - if !sset.Contains(netAddr) { - res = append( - res, - fmt.Sprintf("%s:%d", netAddr, globalMinioPort), - ) - sset.Add(netAddr) - } + for _, ep := range eps { + res = append(res, fmt.Sprintf("%s:%d", ep.host, ep.port)) } return res } diff --git a/cmd/server-main.go b/cmd/server-main.go index f37fa22bf..b161e9cd0 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -17,14 +17,18 @@ package cmd import ( + "errors" "fmt" "net" "net/http" "os" + "runtime" "strconv" "strings" "time" + "regexp" + "github.com/minio/cli" ) @@ -96,11 +100,118 @@ EXAMPLES: } type serverCmdConfig struct { - serverAddr string - disks []string - ignoredDisks []string - isDistXL bool // True only if its distributed XL. - storageDisks []StorageAPI + serverAddr string + endPoints []storageEndPoint + ignoredEndPoints []storageEndPoint + isDistXL bool // True only if its distributed XL. + storageDisks []StorageAPI +} + +// End point is specified in the command line as host:port:path or host:path or path +// host:port:path or host:path - for distributed XL. Default port is 9000. +// just path - for single node XL or FS. +type storageEndPoint struct { + host string // Will be empty for single node XL and FS + port int // Will be valid for distributed XL + path string // Will be valid for all configs +} + +// Returns string form. +func (ep storageEndPoint) String() string { + var str []string + if ep.host != "" { + str = append(str, ep.host) + } + if ep.port != 0 { + str = append(str, strconv.Itoa(ep.port)) + } + if ep.path != "" { + str = append(str, ep.path) + } + return strings.Join(str, ":") +} + +// Returns if ep is present in the eps list. +func (ep storageEndPoint) presentIn(eps []storageEndPoint) bool { + for _, entry := range eps { + if entry == ep { + return true + } + } + return false +} + +// Parse end-point (of the form host:port:path or host:path or path) +func parseStorageEndPoint(ep string, defaultPort int) (storageEndPoint, error) { + if runtime.GOOS == "windows" { + // Try to match path, ex. C:\export + matched, err := regexp.MatchString(`^[a-zA-Z]:\\[^:]+$`, ep) + if err != nil { + return storageEndPoint{}, err + } + if matched { + return storageEndPoint{path: ep}, nil + } + + // Try to match host:path ex. 127.0.0.1:C:\export + re, err := regexp.Compile(`^([^:]+):([a-zA-Z]:\\[^:]+)$`) + if err != nil { + return storageEndPoint{}, err + } + result := re.FindStringSubmatch(ep) + if len(result) != 0 { + return storageEndPoint{host: result[1], port: defaultPort, path: result[2]}, nil + } + + // Try to match host:port:path ex. 127.0.0.1:443:C:\export + re, err = regexp.Compile(`^([^:]+):([0-9]+):([a-zA-Z]:\\[^:]+)$`) + if err != nil { + return storageEndPoint{}, err + } + result = re.FindStringSubmatch(ep) + if len(result) != 0 { + portInt, err := strconv.Atoi(result[2]) + if err != nil { + return storageEndPoint{}, err + } + return storageEndPoint{host: result[1], port: portInt, path: result[3]}, nil + } + return storageEndPoint{}, errors.New("Unable to parse endpoint " + ep) + } + // For *nix OSes + parts := strings.Split(ep, ":") + var parsedep storageEndPoint + switch len(parts) { + case 1: + parsedep = storageEndPoint{path: parts[0]} + case 2: + parsedep = storageEndPoint{host: parts[0], port: defaultPort, path: parts[1]} + case 3: + port, err := strconv.Atoi(parts[1]) + if err != nil { + return storageEndPoint{}, err + } + parsedep = storageEndPoint{host: parts[0], port: port, path: parts[2]} + default: + return storageEndPoint{}, errors.New("Unable to parse " + ep) + } + return parsedep, nil +} + +// Parse an array of end-points (passed on the command line) +func parseStorageEndPoints(eps []string, defaultPort int) (endpoints []storageEndPoint, err error) { + for _, ep := range eps { + if ep == "" { + continue + } + var endpoint storageEndPoint + endpoint, err = parseStorageEndPoint(ep, defaultPort) + if err != nil { + return nil, err + } + endpoints = append(endpoints, endpoint) + } + return endpoints, nil } // getListenIPs - gets all the ips to listen on. @@ -194,13 +305,13 @@ func initServerConfig(c *cli.Context) { } // Validate if input disks are sufficient for initializing XL. -func checkSufficientDisks(disks []string) error { +func checkSufficientDisks(eps []storageEndPoint) error { // Verify total number of disks. - totalDisks := len(disks) - if totalDisks > maxErasureBlocks { + total := len(eps) + if total > maxErasureBlocks { return errXLMaxDisks } - if totalDisks < minErasureBlocks { + if total < minErasureBlocks { return errXLMinDisks } @@ -211,7 +322,7 @@ func checkSufficientDisks(disks []string) error { // Verify if we have even number of disks. // only combination of 4, 6, 8, 10, 12, 14, 16 are supported. - if !isEven(totalDisks) { + if !isEven(total) { return errXLNumDisks } @@ -219,36 +330,19 @@ func checkSufficientDisks(disks []string) error { return nil } -// Validates if disks are of supported format, invalid arguments are rejected. -func checkNamingDisks(disks []string) error { - for _, disk := range disks { - _, _, err := splitNetPath(disk) - if err != nil { - return err - } - } - return nil -} - // Validate input disks. -func validateDisks(disks []string, ignoredDisks []string) []StorageAPI { - isXL := len(disks) > 1 +func validateDisks(endPoints []storageEndPoint, ignoredEndPoints []storageEndPoint) []StorageAPI { + isXL := len(endPoints) > 1 if isXL { // Validate if input disks have duplicates in them. - err := checkDuplicates(disks) + err := checkDuplicateEndPoints(endPoints) fatalIf(err, "Invalid disk arguments for server.") // Validate if input disks are sufficient for erasure coded setup. - err = checkSufficientDisks(disks) - fatalIf(err, "Invalid disk arguments for server.") - - // Validate if input disks are properly named in accordance with either - // - /mnt/disk1 - // - ip:/mnt/disk1 - err = checkNamingDisks(disks) + err = checkSufficientDisks(endPoints) fatalIf(err, "Invalid disk arguments for server.") } - storageDisks, err := initStorageDisks(disks, ignoredDisks) + storageDisks, err := initStorageDisks(endPoints, ignoredEndPoints) fatalIf(err, "Unable to initialize storage disks.") return storageDisks } @@ -273,10 +367,10 @@ func getPort(address string) int { } // Returns if slice of disks is a distributed setup. -func isDistributedSetup(disks []string) (isDist bool) { +func isDistributedSetup(eps []storageEndPoint) (isDist bool) { // Port to connect to for the lock servers in a distributed setup. - for _, disk := range disks { - if !isLocalStorage(disk) { + for _, ep := range eps { + if !isLocalStorage(ep) { // One or more disks supplied as arguments are not // attached to the local node. isDist = true @@ -295,17 +389,25 @@ func serverMain(c *cli.Context) { serverAddr := c.String("address") // Check if requested port is available. - port := getPort(serverAddr) - fatalIf(checkPortAvailability(port), "Port unavailable %d", port) + host, portStr, err := net.SplitHostPort(serverAddr) + fatalIf(err, "Unable to parse %s.", serverAddr) + + portInt, err := strconv.Atoi(portStr) + fatalIf(err, "Invalid port number.") + + fatalIf(checkPortAvailability(portInt), "Port unavailable %d", portInt) - // Saves port in a globally accessible value. - globalMinioPort = port + // Saves host and port in a globally accessible value. + globalMinioPort = portInt + globalMinioHost = host // Disks to be ignored in server init, to skip format healing. - ignoredDisks := strings.Split(c.String("ignore-disks"), ",") + ignoredDisks, err := parseStorageEndPoints(strings.Split(c.String("ignore-disks"), ","), portInt) + fatalIf(err, "Unable to parse storage endpoints %s", strings.Split(c.String("ignore-disks"), ",")) // Disks to be used in server init. - disks := c.Args() + disks, err := parseStorageEndPoints(c.Args(), portInt) + fatalIf(err, "Unable to parse storage endpoints %s", c.Args()) // Initialize server config. initServerConfig(c) @@ -324,11 +426,11 @@ func serverMain(c *cli.Context) { // Configure server. srvConfig := serverCmdConfig{ - serverAddr: serverAddr, - disks: disks, - ignoredDisks: ignoredDisks, - storageDisks: storageDisks, - isDistXL: isDistributedSetup(disks), + serverAddr: serverAddr, + endPoints: disks, + ignoredEndPoints: ignoredDisks, + storageDisks: storageDisks, + isDistXL: isDistributedSetup(disks), } // Configure server. @@ -337,7 +439,7 @@ func serverMain(c *cli.Context) { // Set nodes for dsync for distributed setup. if srvConfig.isDistXL { - fatalIf(initDsyncNodes(disks, port), "Unable to initialize distributed locking") + fatalIf(initDsyncNodes(disks), "Unable to initialize distributed locking") } // Initialize name space lock. diff --git a/cmd/server-main_test.go b/cmd/server-main_test.go index 360646308..f212482b0 100644 --- a/cmd/server-main_test.go +++ b/cmd/server-main_test.go @@ -18,7 +18,6 @@ package cmd import ( "flag" - "net" "net/http" "os" "path/filepath" @@ -64,23 +63,47 @@ func TestFinalizeEndpoints(t *testing.T) { // Tests all the expected input disks for function checkSufficientDisks. func TestCheckSufficientDisks(t *testing.T) { - xlDisks := []string{ - "/mnt/backend1", - "/mnt/backend2", - "/mnt/backend3", - "/mnt/backend4", - "/mnt/backend5", - "/mnt/backend6", - "/mnt/backend7", - "/mnt/backend8", - "/mnt/backend9", - "/mnt/backend10", - "/mnt/backend11", - "/mnt/backend12", - "/mnt/backend13", - "/mnt/backend14", - "/mnt/backend15", - "/mnt/backend16", + var xlDisks []string + if runtime.GOOS == "windows" { + xlDisks = []string{ + "C:\\mnt\\backend1", + "C:\\mnt\\backend2", + "C:\\mnt\\backend3", + "C:\\mnt\\backend4", + "C:\\mnt\\backend5", + "C:\\mnt\\backend6", + "C:\\mnt\\backend7", + "C:\\mnt\\backend8", + "C:\\mnt\\backend9", + "C:\\mnt\\backend10", + "C:\\mnt\\backend11", + "C:\\mnt\\backend12", + "C:\\mnt\\backend13", + "C:\\mnt\\backend14", + "C:\\mnt\\backend15", + "C:\\mnt\\backend16", + "C:\\mnt\\backend17", + } + } else { + xlDisks = []string{ + "/mnt/backend1", + "/mnt/backend2", + "/mnt/backend3", + "/mnt/backend4", + "/mnt/backend5", + "/mnt/backend6", + "/mnt/backend7", + "/mnt/backend8", + "/mnt/backend9", + "/mnt/backend10", + "/mnt/backend11", + "/mnt/backend12", + "/mnt/backend13", + "/mnt/backend14", + "/mnt/backend15", + "/mnt/backend16", + "/mnt/backend17", + } } // List of test cases fo sufficient disk verification. testCases := []struct { @@ -104,7 +127,7 @@ func TestCheckSufficientDisks(t *testing.T) { }, // Larger than maximum number of disks > 16. { - append(xlDisks[0:16], "/mnt/unsupported"), + xlDisks, errXLMaxDisks, }, // Lesser than minimum number of disks < 6. @@ -121,43 +144,12 @@ func TestCheckSufficientDisks(t *testing.T) { // Validates different variations of input disks. for i, testCase := range testCases { - if checkSufficientDisks(testCase.disks) != testCase.expectedErr { - t.Errorf("Test %d expected to pass for disks %s", i+1, testCase.disks) - } - } -} - -func TestCheckNamingDisks(t *testing.T) { - var testCases []struct { - disks []string - err error - } - if runtime.GOOS == "windows" { - testCases = []struct { - disks []string - err error - }{ - {[]string{`:C:\\a\\b\\c`}, &net.AddrError{Err: "Missing address in network path", Addr: `:C:\\a\\b\\c`}}, - {[]string{`localhost:C:\\mnt\\disk1`, `localhost:C:\\mnt\\disk2`}, nil}, - } - } else { - testCases = []struct { - disks []string - err error - }{ - {[]string{"localhost:/mnt/disk1", ":/a/b/c"}, &net.AddrError{Err: "Missing address in network path", Addr: ":/a/b/c"}}, - {[]string{"localhost:/mnt/disk1", "localhost:/mnt/disk2"}, nil}, + endpoints, err := parseStorageEndPoints(testCase.disks, 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) } - } - - for i, test := range testCases { - err := checkNamingDisks(test.disks) - if test.err != nil { - if err == nil || err.Error() != test.err.Error() { - t.Errorf("Test %d failed with %v but expected error %v", i+1, err, test.err) - } - } else if err != test.err { - t.Errorf("Test %d failed with %v but expected error %v", i+1, err, test.err) + if checkSufficientDisks(endpoints) != testCase.expectedErr { + t.Errorf("Test %d expected to pass for disks %s", i+1, testCase.disks) } } } @@ -186,7 +178,11 @@ func TestCheckServerSyntax(t *testing.T) { t.Errorf("Test %d failed to parse arguments %s", i+1, disks) } defer removeRoots(disks) - _ = validateDisks(disks, nil) + endpoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) + } + _ = validateDisks(endpoints, nil) } } @@ -268,7 +264,11 @@ func TestIsDistributedSetup(t *testing.T) { } for i, test := range testCases { - res := isDistributedSetup(test.disks) + endpoints, err := parseStorageEndPoints(test.disks, 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) + } + res := isDistributedSetup(endpoints) if res != test.result { t.Errorf("Test %d: expected result %t but received %t", i+1, test.result, res) } diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index bfaccaad9..eddbbbaa4 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -17,12 +17,11 @@ package cmd import ( + "fmt" "io" "net" "net/rpc" "path" - "strconv" - "strings" "github.com/minio/minio/pkg/disk" ) @@ -94,21 +93,16 @@ func toStorageErr(err error) error { } // Initialize new rpc client. -func newRPCClient(networkPath string) (StorageAPI, error) { +func newRPCClient(ep storageEndPoint) (StorageAPI, error) { // Input validation. - if networkPath == "" || strings.LastIndex(networkPath, ":") == -1 { + if ep.host == "" || ep.port == 0 || ep.path == "" { return nil, errInvalidArgument } - // Split network path into its components. - netAddr, netPath, err := splitNetPath(networkPath) - if err != nil { - return nil, err - } - // Dial minio rpc storage http path. - rpcPath := path.Join(storageRPCPath, netPath) - rpcAddr := netAddr + ":" + strconv.Itoa(globalMinioPort) + rpcPath := path.Join(storageRPCPath, ep.path) + rpcAddr := fmt.Sprintf("%s:%d", ep.host, ep.port) + // Initialize rpc client with network address and rpc path. cred := serverConfig.GetCredential() rpcClient := newAuthClient(&authConfig{ @@ -122,8 +116,8 @@ func newRPCClient(networkPath string) (StorageAPI, error) { // Initialize network storage. ndisk := &networkStorage{ - netAddr: netAddr, - netPath: netPath, + netAddr: ep.host, + netPath: ep.path, rpcClient: rpcClient, } diff --git a/cmd/storage-rpc-client_test.go b/cmd/storage-rpc-client_test.go index 6126968f5..9da47f41c 100644 --- a/cmd/storage-rpc-client_test.go +++ b/cmd/storage-rpc-client_test.go @@ -24,8 +24,6 @@ import ( "net" "net/rpc" "runtime" - "strconv" - "strings" "testing" ) @@ -144,14 +142,14 @@ type TestRPCStorageSuite struct { // Starting the Test server with temporary FS backend. func (s *TestRPCStorageSuite) SetUpSuite(c *testing.T) { s.testServer = StartTestStorageRPCServer(c, s.serverType, 1) - splitAddrs := strings.Split(s.testServer.Server.Listener.Addr().String(), ":") - var err error - globalMinioPort, err = strconv.Atoi(splitAddrs[1]) - if err != nil { - c.Fatalf("Unable to convert %s to its integer representation, %s", splitAddrs[1], err) - } + listenAddress := s.testServer.Server.Listener.Addr().String() + for _, disk := range s.testServer.Disks { - storageDisk, err := newRPCClient(splitAddrs[0] + ":" + disk) + remoteEndPoint, err := parseStorageEndPoint(listenAddress+":"+disk.path, 0) + if err != nil { + c.Fatalf("Unexpected error %s", err) + } + storageDisk, err := newRPCClient(remoteEndPoint) if err != nil { c.Fatal("Unable to initialize RPC client", err) } diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go index 8098749ea..0e00840b0 100644 --- a/cmd/storage-rpc-server.go +++ b/cmd/storage-rpc-server.go @@ -21,11 +21,9 @@ import ( "io" "net/rpc" "path" - "strings" "time" router "github.com/gorilla/mux" - "github.com/minio/minio-go/pkg/set" "github.com/minio/minio/pkg/disk" ) @@ -220,36 +218,20 @@ func (s *storageServer) TryInitHandler(args *GenericArgs, reply *GenericReply) e // Initialize new storage rpc. func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err error) { - // Initialize posix storage API. - exports := serverConfig.disks - ignoredExports := serverConfig.ignoredDisks - - // Initialize ignored disks in a new set. - ignoredSet := set.NewStringSet() - if len(ignoredExports) > 0 { - ignoredSet = set.CreateStringSet(ignoredExports...) - } - for _, export := range exports { - if ignoredSet.Contains(export) { - // Ignore initializing ignored export. + for _, ep := range serverConfig.endPoints { + if ep.presentIn(serverConfig.ignoredEndPoints) { + // Do not init ignored end point. continue } // e.g server:/mnt/disk1 - if isLocalStorage(export) { - if idx := strings.LastIndex(export, ":"); idx != -1 { - export = export[idx+1:] - } - var storage StorageAPI - storage, err = newPosix(export) + if isLocalStorage(ep) { + storage, err := newPosix(ep.path) if err != nil && err != errDiskNotFound { return nil, err } - if idx := strings.LastIndex(export, ":"); idx != -1 { - export = export[idx+1:] - } servers = append(servers, &storageServer{ storage: storage, - path: export, + path: ep.path, }) } } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index b5fad4a5c..a04cc0f74 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -61,7 +61,11 @@ func prepareFS() (ObjectLayer, string, error) { if err != nil { return nil, "", err } - obj, _, err := initObjectLayer(fsDirs, nil) + endpoints, err := parseStorageEndPoints(fsDirs, 0) + if err != nil { + return nil, "", err + } + obj, _, err := initObjectLayer(endpoints, nil) if err != nil { removeRoots(fsDirs) return nil, "", err @@ -75,7 +79,11 @@ func prepareXL() (ObjectLayer, []string, error) { if err != nil { return nil, nil, err } - obj, _, err := initObjectLayer(fsDirs, nil) + endpoints, err := parseStorageEndPoints(fsDirs, 0) + if err != nil { + return nil, nil, err + } + obj, _, err := initObjectLayer(endpoints, nil) if err != nil { removeRoots(fsDirs) return nil, nil, err @@ -145,7 +153,7 @@ func isSameType(obj1, obj2 interface{}) bool { // defer s.Stop() type TestServer struct { Root string - Disks []string + Disks []storageEndPoint AccessKey string SecretKey string Server *httptest.Server @@ -174,17 +182,20 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer { credentials := serverConfig.GetCredential() testServer.Root = root - testServer.Disks = disks + testServer.Disks, err = parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) + } testServer.AccessKey = credentials.AccessKeyID testServer.SecretKey = credentials.SecretAccessKey - objLayer, storageDisks, err := initObjectLayer(disks, nil) + objLayer, storageDisks, err := initObjectLayer(testServer.Disks, nil) if err != nil { t.Fatalf("Failed obtaining Temp Backend: %s", err) } srvCmdCfg := serverCmdConfig{ - disks: disks, + endPoints: testServer.Disks, storageDisks: storageDisks, } httpHandler, err := configureServerHandler( @@ -214,7 +225,11 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer { t.Fatal("Early setup error:", err) } globalMinioAddr = getLocalAddress(srvCmdCfg) - initGlobalS3Peers(disks) + endpoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatal("Early setup error:", err) + } + initGlobalS3Peers(endpoints) return testServer } @@ -236,6 +251,10 @@ func StartTestStorageRPCServer(t TestErrHandler, instanceType string, diskN int) if err != nil { t.Fatal("Failed to create disks for the backend") } + endPoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatalf("%s", err) + } root, err := newTestConfig("us-east-1") if err != nil { @@ -248,13 +267,13 @@ func StartTestStorageRPCServer(t TestErrHandler, instanceType string, diskN int) credentials := serverConfig.GetCredential() testRPCServer.Root = root - testRPCServer.Disks = disks + testRPCServer.Disks = endPoints testRPCServer.AccessKey = credentials.AccessKeyID testRPCServer.SecretKey = credentials.SecretAccessKey // Run TestServer. testRPCServer.Server = httptest.NewServer(initTestStorageRPCEndPoint(serverCmdConfig{ - disks: disks, + endPoints: endPoints, })) return testRPCServer } @@ -267,6 +286,10 @@ func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer { if err != nil { t.Fatal("Failed to create disks for the backend") } + endPoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatalf("%s", err) + } root, err := newTestConfig("us-east-1") if err != nil { @@ -279,12 +302,12 @@ func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer { credentials := serverConfig.GetCredential() testRPCServer.Root = root - testRPCServer.Disks = disks + testRPCServer.Disks = endPoints testRPCServer.AccessKey = credentials.AccessKeyID testRPCServer.SecretKey = credentials.SecretAccessKey // create temporary backend for the test server. - objLayer, storageDisks, err := initObjectLayer(disks, nil) + objLayer, storageDisks, err := initObjectLayer(endPoints, nil) if err != nil { t.Fatalf("Failed obtaining Temp Backend: %s", err) } @@ -295,7 +318,7 @@ func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer { globalObjLayerMutex.Unlock() srvCfg := serverCmdConfig{ - disks: disks, + endPoints: endPoints, storageDisks: storageDisks, } @@ -335,6 +358,10 @@ func StartTestControlRPCServer(t TestErrHandler, instanceType string) TestServer if err != nil { t.Fatal("Failed to create disks for the backend") } + endPoints, err := parseStorageEndPoints(disks, 0) + if err != nil { + t.Fatalf("%s", err) + } root, err := newTestConfig("us-east-1") if err != nil { @@ -347,12 +374,12 @@ func StartTestControlRPCServer(t TestErrHandler, instanceType string) TestServer credentials := serverConfig.GetCredential() testRPCServer.Root = root - testRPCServer.Disks = disks + testRPCServer.Disks = endPoints testRPCServer.AccessKey = credentials.AccessKeyID testRPCServer.SecretKey = credentials.SecretAccessKey // create temporary backend for the test server. - objLayer, storageDisks, err := initObjectLayer(disks, nil) + objLayer, storageDisks, err := initObjectLayer(endPoints, nil) if err != nil { t.Fatalf("Failed obtaining Temp Backend: %s", err) } @@ -401,7 +428,7 @@ func newTestConfig(bucketLocation string) (rootPath string, err error) { func (testServer TestServer) Stop() { removeAll(testServer.Root) for _, disk := range testServer.Disks { - removeAll(disk) + removeAll(disk.path) } testServer.Server.Close() } @@ -1454,8 +1481,8 @@ func getRandomDisks(N int) ([]string, error) { } // initObjectLayer - Instantiates object layer and returns it. -func initObjectLayer(disks []string, ignoredDisks []string) (ObjectLayer, []StorageAPI, error) { - storageDisks, err := initStorageDisks(disks, ignoredDisks) +func initObjectLayer(endPoints []storageEndPoint, ignoredEndPoints []storageEndPoint) (ObjectLayer, []StorageAPI, error) { + storageDisks, err := initStorageDisks(endPoints, ignoredEndPoints) if err != nil { return nil, nil, err } @@ -1531,7 +1558,12 @@ func prepareXLStorageDisks(t *testing.T) ([]StorageAPI, []string) { if err != nil { t.Fatal("Unexpected error: ", err) } - _, storageDisks, err := initObjectLayer(fsDirs, nil) + endpoints, err := parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatal("Unexpected error: ", err) + } + + _, storageDisks, err := initObjectLayer(endpoints, nil) if err != nil { removeRoots(fsDirs) t.Fatal("Unable to initialize storage disks", err) @@ -1815,7 +1847,11 @@ func ExecObjectLayerStaleFilesTest(t *testing.T, objTest objTestStaleFilesType) if err != nil { t.Fatalf("Initialization of disks for XL setup: %s", err) } - objLayer, _, err := initObjectLayer(erasureDisks, nil) + endpoints, err := parseStorageEndPoints(erasureDisks, 0) + if err != nil { + t.Fatalf("Initialization of disks for XL setup: %s", err) + } + objLayer, _, err := initObjectLayer(endpoints, nil) if err != nil { t.Fatalf("Initialization of object layer failed for XL setup: %s", err) } diff --git a/cmd/tree-walk_test.go b/cmd/tree-walk_test.go index fcb76e29f..7426512c5 100644 --- a/cmd/tree-walk_test.go +++ b/cmd/tree-walk_test.go @@ -163,11 +163,15 @@ func testTreeWalkMarker(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) { func TestTreeWalk(t *testing.T) { fsDir, err := ioutil.TempDir("", "minio-") if err != nil { - t.Errorf("Unable to create tmp directory: %s", err) + t.Fatalf("Unable to create tmp directory: %s", err) } - disk, err := newStorageAPI(fsDir) + endpoint, err := parseStorageEndPoint(fsDir, 0) if err != nil { - t.Errorf("Unable to create StorageAPI: %s", err) + t.Fatalf("Unexpected error %s", err) + } + disk, err := newStorageAPI(endpoint) + if err != nil { + t.Fatalf("Unable to create StorageAPI: %s", err) } var files = []string{ @@ -200,11 +204,15 @@ func TestTreeWalk(t *testing.T) { func TestTreeWalkTimeout(t *testing.T) { fsDir, err := ioutil.TempDir("", "minio-") if err != nil { - t.Errorf("Unable to create tmp directory: %s", err) + t.Fatalf("Unable to create tmp directory: %s", err) } - disk, err := newStorageAPI(fsDir) + endpoint, err := parseStorageEndPoint(fsDir, 0) if err != nil { - t.Errorf("Unable to create StorageAPI: %s", err) + t.Fatalf("Unexpected error %s", err) + } + disk, err := newStorageAPI(endpoint) + if err != nil { + t.Fatalf("Unable to create StorageAPI: %s", err) } var myfiles []string // Create maxObjectsList+1 number of entries. @@ -278,12 +286,23 @@ func TestListDir(t *testing.T) { t.Errorf("Unable to create tmp directory: %s", err) } + endpoint1, err := parseStorageEndPoint(fsDir1, 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) + } + // Create two StorageAPIs disk1 and disk2. - disk1, err := newStorageAPI(fsDir1) + disk1, err := newStorageAPI(endpoint1) if err != nil { t.Errorf("Unable to create StorageAPI: %s", err) } - disk2, err := newStorageAPI(fsDir2) + + endpoint2, err := parseStorageEndPoint(fsDir2, 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) + } + + disk2, err := newStorageAPI(endpoint2) if err != nil { t.Errorf("Unable to create StorageAPI: %s", err) } @@ -348,13 +367,18 @@ func TestRecursiveTreeWalk(t *testing.T) { // Create a backend directories fsDir1. fsDir1, err := ioutil.TempDir("", "minio-") if err != nil { - t.Errorf("Unable to create tmp directory: %s", err) + t.Fatalf("Unable to create tmp directory: %s", err) + } + + endpoint1, err := parseStorageEndPoint(fsDir1, 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) } // Create two StorageAPIs disk1. - disk1, err := newStorageAPI(fsDir1) + disk1, err := newStorageAPI(endpoint1) if err != nil { - t.Errorf("Unable to create StorageAPI: %s", err) + t.Fatalf("Unable to create StorageAPI: %s", err) } // Simple isLeaf check, returns true if there is no trailing "/" @@ -458,8 +482,13 @@ func TestSortedness(t *testing.T) { t.Errorf("Unable to create tmp directory: %s", err) } + endpoint1, err := parseStorageEndPoint(fsDir1, 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) + } + // Create two StorageAPIs disk1. - disk1, err := newStorageAPI(fsDir1) + disk1, err := newStorageAPI(endpoint1) if err != nil { t.Errorf("Unable to create StorageAPI: %s", err) } @@ -533,8 +562,13 @@ func TestTreeWalkIsEnd(t *testing.T) { t.Errorf("Unable to create tmp directory: %s", err) } + endpoint1, err := parseStorageEndPoint(fsDir1, 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) + } + // Create two StorageAPIs disk1. - disk1, err := newStorageAPI(fsDir1) + disk1, err := newStorageAPI(endpoint1) if err != nil { t.Errorf("Unable to create StorageAPI: %s", err) } diff --git a/cmd/utils.go b/cmd/utils.go index 087658be9..e85361a3f 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -46,7 +46,7 @@ func cloneHeader(h http.Header) http.Header { } // checkDuplicates - function to validate if there are duplicates in a slice of strings. -func checkDuplicates(list []string) error { +func checkDuplicateStrings(list []string) error { // Empty lists are not allowed. if len(list) == 0 { return errInvalidArgument @@ -72,6 +72,15 @@ func checkDuplicates(list []string) error { return nil } +// checkDuplicates - function to validate if there are duplicates in a slice of endPoints. +func checkDuplicateEndPoints(list []storageEndPoint) error { + var strs []string + for _, ep := range list { + strs = append(strs, ep.String()) + } + return checkDuplicateStrings(strs) +} + // splits network path into its components Address and Path. func splitNetPath(networkPath string) (netAddr, netPath string, err error) { if runtime.GOOS == "windows" { @@ -99,14 +108,10 @@ func getLocalAddress(srvCmdConfig serverCmdConfig) string { if !srvCmdConfig.isDistXL { return srvCmdConfig.serverAddr } - for _, export := range srvCmdConfig.disks { + for _, ep := range srvCmdConfig.endPoints { // Validates if remote disk is local. - if isLocalStorage(export) { - var host string - if idx := strings.LastIndex(export, ":"); idx != -1 { - host = export[:idx] - } - return fmt.Sprintf("%s:%d", host, globalMinioPort) + if isLocalStorage(ep) { + return fmt.Sprintf("%s:%d", ep.host, ep.port) } } return "" diff --git a/cmd/utils_test.go b/cmd/utils_test.go index 0b387d553..a046f5c52 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -89,7 +89,7 @@ func TestCheckDuplicates(t *testing.T) { // Validate if function runs as expected. for i, test := range tests { - err := checkDuplicates(test.list) + err := checkDuplicateStrings(test.list) if test.shouldPass && err != test.err { t.Errorf("Test: %d, Expected %s got %s", i+1, test.err, err) } @@ -233,11 +233,11 @@ func TestLocalAddress(t *testing.T) { { srvCmdConfig: serverCmdConfig{ isDistXL: true, - disks: []string{ - "localhost:/mnt/disk1", - "1.1.1.2:/mnt/disk2", - "1.1.2.1:/mnt/disk3", - "1.1.2.2:/mnt/disk4", + endPoints: []storageEndPoint{ + {"localhost", 9000, "/mnt/disk1"}, + {"1.1.1.2", 9000, "/mnt/disk2"}, + {"1.1.2.1", 9000, "/mnt/disk3"}, + {"1.1.2.2", 9000, "/mnt/disk4"}, }, }, localAddr: fmt.Sprintf("localhost:%d", globalMinioPort), @@ -247,11 +247,11 @@ func TestLocalAddress(t *testing.T) { srvCmdConfig: serverCmdConfig{ serverAddr: fmt.Sprintf(":%d", globalMinioPort), isDistXL: false, - disks: []string{ - "/mnt/disk1", - "/mnt/disk2", - "/mnt/disk3", - "/mnt/disk4", + endPoints: []storageEndPoint{ + {path: "/mnt/disk1"}, + {path: "/mnt/disk2"}, + {path: "/mnt/disk3"}, + {path: "/mnt/disk4"}, }, }, localAddr: fmt.Sprintf(":%d", globalMinioPort), @@ -260,11 +260,11 @@ func TestLocalAddress(t *testing.T) { { srvCmdConfig: serverCmdConfig{ isDistXL: true, - disks: []string{ - "1.1.1.1:/mnt/disk1", - "1.1.1.2:/mnt/disk2", - "1.1.2.1:/mnt/disk3", - "1.1.2.2:/mnt/disk4", + endPoints: []storageEndPoint{ + {"1.1.1.1", 9000, "/mnt/disk1"}, + {"1.1.1.2", 9000, "/mnt/disk2"}, + {"1.1.2.1", 9000, "/mnt/disk3"}, + {"1.1.2.2", 9000, "/mnt/disk4"}, }, }, localAddr: "", @@ -276,11 +276,11 @@ func TestLocalAddress(t *testing.T) { srvCmdConfig: serverCmdConfig{ serverAddr: "play.minio.io:9000", isDistXL: false, - disks: []string{ - "/mnt/disk1", - "/mnt/disk2", - "/mnt/disk3", - "/mnt/disk4", + endPoints: []storageEndPoint{ + {path: "/mnt/disk1"}, + {path: "/mnt/disk2"}, + {path: "/mnt/disk3"}, + {path: "/mnt/disk4"}, }, }, localAddr: "play.minio.io:9000", diff --git a/cmd/xl-v1_test.go b/cmd/xl-v1_test.go index a1ff49fc7..5ada97883 100644 --- a/cmd/xl-v1_test.go +++ b/cmd/xl-v1_test.go @@ -51,7 +51,17 @@ func TestStorageInfo(t *testing.T) { t.Fatalf("Diskinfo total values should be greater 0") } - storageDisks, err := initStorageDisks(fsDirs, fsDirs[:4]) + endpoints, err := parseStorageEndPoints(fsDirs, 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) + } + + ignoredEndpoints, err := parseStorageEndPoints(fsDirs[:4], 0) + if err != nil { + t.Fatalf("Unexpected error %s", err) + } + + storageDisks, err := initStorageDisks(endpoints, ignoredEndpoints) if err != nil { t.Fatal("Unexpected error: ", err) } @@ -141,7 +151,12 @@ func TestNewXL(t *testing.T) { t.Fatalf("Unable to initialize erasure, %s", err) } - storageDisks, err := initStorageDisks(erasureDisks, nil) + endpoints, err := parseStorageEndPoints(erasureDisks, 0) + if err != nil { + t.Fatalf("Unable to initialize erasure, %s", err) + } + + storageDisks, err := initStorageDisks(endpoints, nil) if err != nil { t.Fatal("Unexpected error: ", err) } @@ -161,7 +176,16 @@ func TestNewXL(t *testing.T) { t.Fatalf("Unable to initialize erasure, %s", err) } - storageDisks, err = initStorageDisks(erasureDisks, erasureDisks[:2]) + endpoints, err = parseStorageEndPoints(erasureDisks, 0) + if err != nil { + t.Fatalf("Unable to initialize erasure, %s", err) + } + + ignoredEndpoints, err := parseStorageEndPoints(erasureDisks[:2], 0) + if err != nil { + t.Fatalf("Unable to initialize erasure, %s", err) + } + storageDisks, err = initStorageDisks(endpoints, ignoredEndpoints) if err != nil { t.Fatal("Unexpected error: ", err) }