From f22862aa2889b0d73fec3fb06cce27cd0efa286c Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 14 Oct 2016 19:57:40 -0700 Subject: [PATCH] heal: Refactor heal command. (#2901) - return errors for heal operation through rpc replies. - implement rotating wheel for healing status. Fixes #2491 --- .travis.yml | 4 +- cmd/control-handlers.go | 83 ++++++++++-- cmd/control-heal-main.go | 241 ++++++++++++++++++++------------- cmd/control-mains_test.go | 48 ++++++- cmd/control-router.go | 4 +- cmd/control_test.go | 82 ++++++----- cmd/errors.go | 6 +- cmd/format-config-v1.go | 12 +- cmd/format-config-v1_test.go | 14 +- cmd/fs-v1.go | 7 +- cmd/globals.go | 6 +- cmd/object-interface.go | 7 +- cmd/scan-bar.go | 92 +++++++++++++ cmd/utils.go | 13 ++ cmd/utils_test.go | 42 ++++++ cmd/xl-v1-bucket.go | 24 ---- cmd/xl-v1-list-objects-heal.go | 47 ++++--- cmd/xl-v1-object.go | 87 ++++++++++-- cmd/xl-v1-object_test.go | 9 +- cmd/xl-v1.go | 25 ++-- 20 files changed, 614 insertions(+), 239 deletions(-) create mode 100644 cmd/scan-bar.go diff --git a/.travis.yml b/.travis.yml index 81005ce32..a9df9a407 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,9 +6,9 @@ language: go os: - linux -- osx +#- osx -osx_image: xcode7.2 +#osx_image: xcode7.2 env: - ARCH=x86_64 diff --git a/cmd/control-handlers.go b/cmd/control-handlers.go index 51d87d13f..23ec8b664 100644 --- a/cmd/control-handlers.go +++ b/cmd/control-handlers.go @@ -68,7 +68,7 @@ type HealListArgs struct { type HealListReply struct { IsTruncated bool NextMarker string - Objects []string + Objects []ObjectInfo } // ListObjects - list all objects that needs healing. @@ -80,35 +80,64 @@ func (c *controlAPIHandlers) ListObjectsHealHandler(args *HealListArgs, reply *H if !isRPCTokenValid(args.Token) { return errInvalidToken } + if !c.IsXL { + return nil + } info, err := objAPI.ListObjectsHeal(args.Bucket, args.Prefix, args.Marker, args.Delimiter, args.MaxKeys) if err != nil { return err } reply.IsTruncated = info.IsTruncated reply.NextMarker = info.NextMarker - for _, obj := range info.Objects { - reply.Objects = append(reply.Objects, obj.Name) - } + reply.Objects = info.Objects return nil } +// HealBucketArgs - arguments for HealBucket RPC. +type HealBucketArgs struct { + // Authentication token generated by Login. + GenericArgs + + // Bucket to be healed. + Bucket string +} + +// Heals missing buckets across disks, if we have enough quorum. +func (c *controlAPIHandlers) HealBucketHandler(args *HealBucketArgs, reply *GenericReply) error { + objAPI := c.ObjectAPI() + if objAPI == nil { + return errServerNotInitialized + } + if !isRPCTokenValid(args.Token) { + return errInvalidToken + } + if !c.IsXL { + return nil + } + // Proceed to heal the bucket. + return objAPI.HealBucket(args.Bucket) +} + // HealObjectArgs - argument for HealObject RPC. type HealObjectArgs struct { // Authentication token generated by Login. GenericArgs - // Name of the bucket. + // Name of the bucket where the object + // needs to be healed. Bucket string - // Name of the object. - Object string + // Name of the object to be healed. + Objects []ObjectInfo } // HealObjectReply - reply by HealObject RPC. -type HealObjectReply struct{} +type HealObjectReply struct { + Results []string +} -// HealObject - heal the object. -func (c *controlAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *GenericReply) error { +// HealObject heals 1000 objects at a time for missing chunks, missing metadata on a given bucket. +func (c *controlAPIHandlers) HealObjectsHandler(args *HealObjectArgs, reply *HealObjectReply) error { objAPI := c.ObjectAPI() if objAPI == nil { return errServerNotInitialized @@ -116,15 +145,38 @@ func (c *controlAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *Gene if !isRPCTokenValid(args.Token) { return errInvalidToken } - return objAPI.HealObject(args.Bucket, args.Object) + if !c.IsXL { + return nil + } + + // Heal all objects that need healing. + var errs = make([]error, len(args.Objects)) + for idx, objInfo := range args.Objects { + errs[idx] = objAPI.HealObject(args.Bucket, objInfo.Name) + } + + // Get all the error causes. + var causes = make([]string, len(args.Objects)) + for id, err := range errs { + if err != nil { + causes[id] = err.Error() + } + } + + // Save the causes. + reply.Results = causes + return nil } -// HealObject - heal the object. -func (c *controlAPIHandlers) HealDiskMetadataHandler(args *GenericArgs, reply *GenericReply) error { +// Heals backend storage format. +func (c *controlAPIHandlers) HealFormatHandler(args *GenericArgs, reply *GenericReply) error { if !isRPCTokenValid(args.Token) { return errInvalidToken } - err := repairDiskMetadata(c.StorageDisks) + if !c.IsXL { + return nil + } + err := healFormatXL(c.StorageDisks) if err != nil { return err } @@ -214,6 +266,9 @@ func (c *controlAPIHandlers) TryInitHandler(args *GenericArgs, reply *GenericRep if !isRPCTokenValid(args.Token) { return errInvalidToken } + if !c.IsXL { + return nil + } go func() { globalWakeupCh <- struct{}{} }() diff --git a/cmd/control-heal-main.go b/cmd/control-heal-main.go index 480bd699e..f2a334cd6 100644 --- a/cmd/control-heal-main.go +++ b/cmd/control-heal-main.go @@ -17,12 +17,13 @@ package cmd import ( + "errors" "fmt" "net/url" "path" - "strings" "github.com/minio/cli" + "github.com/minio/mc/pkg/console" ) var healCmd = cli.Command{ @@ -41,48 +42,154 @@ FLAGS: {{end}} EXAMPLES: - 1. Heal an object. + 1. Heal missing on-disk format across all inconsistent nodes. + $ minio control {{.Name}} http://localhost:9000 + + 2. Heals a specific object. $ minio control {{.Name}} http://localhost:9000/songs/classical/western/piano.mp3 - 2. Heal all objects in a bucket recursively. - $ minio control {{.Name}} http://localhost:9000/songs + 3. Heal bucket and all objects in a bucket recursively. + $ minio control {{.Name}} http://localhost:9000/songs - 3. Heall all objects with a given prefix recursively. - $ minio control {{.Name}} http://localhost:9000/songs/classical/ + 4. Heal all objects with a given prefix recursively. + $ minio control {{.Name}} http://localhost:9000/songs/classical/ `, } -func checkHealControlSyntax(ctx *cli.Context) { - if len(ctx.Args()) != 1 { - cli.ShowCommandHelpAndExit(ctx, "heal", 1) +// heals backend storage format, useful in restoring `format.json` missing on a +// fresh or corrupted disks. This call does deep inspection of backend layout +// and applies appropriate `format.json` to the disk. +func healStorageFormat(authClnt *AuthRPCClient) error { + args := &GenericArgs{} + reply := &GenericReply{} + return authClnt.Call("Control.HealFormatHandler", args, reply) +} + +// lists all objects which needs to be healed, this is a precursor helper function called before +// calling actual healing operation. Returns a maximum of 1000 objects that needs healing at a time. +// Marker indicates the next entry point where the listing will start. +func listObjectsHeal(authClnt *AuthRPCClient, bucketName, prefixName, markerName string) (*HealListReply, error) { + args := &HealListArgs{ + Bucket: bucketName, + Prefix: prefixName, + Marker: markerName, + Delimiter: "", + MaxKeys: 1000, + } + reply := &HealListReply{} + err := authClnt.Call("Control.ListObjectsHealHandler", args, reply) + if err != nil { + return nil, err } + return reply, nil } -// "minio control heal" entry point. -func healControl(ctx *cli.Context) { +// Internal custom struct encapsulates pretty msg to be printed by the caller. +type healMsg struct { + Msg string + Err error +} + +// Prettifies heal results and returns them over a channel, caller reads from this channel and prints. +func prettyHealResults(healedObjects []ObjectInfo, healReply *HealObjectReply) <-chan healMsg { + var msgCh = make(chan healMsg) + + // Starts writing to message channel for the list of results sent back + // by a previous healing operation. + go func(msgCh chan<- healMsg) { + defer close(msgCh) + // Go through all the results and validate if we have success or failure. + for i, healStr := range healReply.Results { + objPath := path.Join(healedObjects[i].Bucket, healedObjects[i].Name) + // TODO: We need to still print heal error cause. + if healStr != "" { + msgCh <- healMsg{ + Msg: fmt.Sprintf("%s %s", colorRed("FAILED"), objPath), + Err: errors.New(healStr), + } + continue + } + msgCh <- healMsg{ + Msg: fmt.Sprintf("%s %s", colorGreen("SUCCESS"), objPath), + } + } + }(msgCh) + + // Return .. + return msgCh +} - checkHealControlSyntax(ctx) - - // Parse bucket and object from url.URL.Path - parseBucketObject := func(path string) (bucketName string, objectName string) { - splits := strings.SplitN(path, string(slashSeparator), 3) - switch len(splits) { - case 0, 1: - bucketName = "" - objectName = "" - case 2: - bucketName = splits[1] - objectName = "" - case 3: - bucketName = splits[1] - objectName = splits[2] +var scanBar = scanBarFactory() +// Heals all the objects under a given bucket, optionally you can specify an +// object prefix to heal objects under this prefix. +func healObjects(authClnt *AuthRPCClient, bucketName, prefixName string) error { + if authClnt == nil || bucketName == "" { + return errInvalidArgument + } + // Save marker for the next request. + var markerName string + for { + healListReply, err := listObjectsHeal(authClnt, bucketName, prefixName, markerName) + if err != nil { + return err + } + + // Attempt to heal only if there are any objects to heal. + if len(healListReply.Objects) > 0 { + healArgs := &HealObjectArgs{ + Bucket: bucketName, + Objects: healListReply.Objects, + } + + healReply := &HealObjectReply{} + err = authClnt.Call("Control.HealObjectsHandler", healArgs, healReply) + if err != nil { + return err + } + + // Pretty print all the heal results. + for msg := range prettyHealResults(healArgs.Objects, healReply) { + if msg.Err != nil { + // TODO we need to print the error cause as well. + scanBar(msg.Msg) + continue + } + // Success. + scanBar(msg.Msg) + } + } + + // End of listing objects for healing. + if !healListReply.IsTruncated { + break } - return bucketName, objectName + + // Set the marker to list the next set of keys. + markerName = healListReply.NextMarker + } + return nil +} - parsedURL, err := url.Parse(ctx.Args()[0]) - fatalIf(err, "Unable to parse URL") +// Heals your bucket for any missing entries. +func healBucket(authClnt *AuthRPCClient, bucketName string) error { + if authClnt == nil || bucketName == "" { + return errInvalidArgument + } + return authClnt.Call("Control.HealBucketHandler", &HealBucketArgs{ + Bucket: bucketName, + }, &GenericReply{}) +} + +// Entry point for minio control heal command. +func healControl(ctx *cli.Context) { + if ctx.Args().Present() && len(ctx.Args()) != 1 { + cli.ShowCommandHelpAndExit(ctx, "heal", 1) + } + + parsedURL, err := url.Parse(ctx.Args().Get(0)) + fatalIf(err, "Unable to parse URL %s", ctx.Args().Get(0)) authCfg := &authConfig{ accessKey: serverConfig.GetCredential().AccessKeyID, @@ -92,71 +199,19 @@ func healControl(ctx *cli.Context) { path: path.Join(reservedBucket, controlPath), loginMethod: "Control.LoginHandler", } - client := newAuthClient(authCfg) - // Always try to fix disk metadata - fmt.Print("Checking and healing disk metadata..") - args := &GenericArgs{} - reply := &GenericReply{} - err = client.Call("Control.HealDiskMetadataHandler", args, reply) - fatalIf(err, "Unable to heal disk metadata.") - fmt.Println(" ok") - - bucketName, objectName := parseBucketObject(parsedURL.Path) - if bucketName == "" { - return - } - - // If object does not have trailing "/" then it's an object, hence heal it. - if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) { - fmt.Printf("Healing : /%s/%s\n", bucketName, objectName) - args := &HealObjectArgs{Bucket: bucketName, Object: objectName} - reply := &HealObjectReply{} - err = client.Call("Control.HealObjectHandler", args, reply) - errorIf(err, "Healing object %s failed.", objectName) + client := newAuthClient(authCfg) + if parsedURL.Path == "/" || parsedURL.Path == "" { + err = healStorageFormat(client) + fatalIf(err, "Unable to heal disk metadata.") return } - - // If object is "" then heal the bucket first. - if objectName == "" { - fmt.Printf("Healing : /%s\n", bucketName) - args := &HealObjectArgs{Bucket: bucketName, Object: ""} - reply := &HealObjectReply{} - err = client.Call("Control.HealObjectHandler", args, reply) - fatalIf(err, "Healing bucket %s failed.", bucketName) - // Continue to heal the objects in the bucket. - } - - // Recursively list and heal the objects. - prefix := objectName - marker := "" - for { - args := &HealListArgs{ - Bucket: bucketName, - Prefix: prefix, - Marker: marker, - Delimiter: "", - MaxKeys: 1000, - } - reply := &HealListReply{} - err = client.Call("Control.ListObjectsHealHandler", args, reply) - fatalIf(err, "Unable to list objects for healing.") - - // Heal the objects returned in the ListObjects reply. - for _, obj := range reply.Objects { - fmt.Printf("Healing : /%s/%s\n", bucketName, obj) - reply := &GenericReply{} - healArgs := &HealObjectArgs{Bucket: bucketName, Object: obj} - err = client.Call("Control.HealObjectHandler", healArgs, reply) - errorIf(err, "Healing object %s failed.", obj) - } - - if !reply.IsTruncated { - // End of listing. - break - } - - // Set the marker to list the next set of keys. - marker = reply.NextMarker - } + bucketName, prefixName := urlPathSplit(parsedURL.Path) + // Heal the bucket. + err = healBucket(client, bucketName) + fatalIf(err, "Unable to heal bucket %s", bucketName) + // Heal all the objects. + err = healObjects(client, bucketName, prefixName) + fatalIf(err, "Unable to heal objects on bucket %s at prefix %s", bucketName, prefixName) + console.Println() } diff --git a/cmd/control-mains_test.go b/cmd/control-mains_test.go index ed6a552eb..54c017831 100644 --- a/cmd/control-mains_test.go +++ b/cmd/control-mains_test.go @@ -17,6 +17,10 @@ package cmd import ( + "bytes" + "crypto/rand" + "os" + "path" "testing" "github.com/minio/cli" @@ -43,7 +47,49 @@ func TestControlHealMain(t *testing.T) { // run app err := app.Run(args) if err != nil { - t.Errorf("Control-Heal-Main test failed with - %s", err.Error()) + t.Errorf("Control-Heal-Format-Main test failed with - %s", err.Error()) + } + + obj := newObjectLayerFn() + // Create "bucket" + err = obj.MakeBucket("bucket") + if err != nil { + t.Fatal(err) + } + + bucket := "bucket" + object := "object" + + data := make([]byte, 1*1024*1024) + length := int64(len(data)) + _, err = rand.Read(data) + if err != nil { + t.Fatal(err) + } + + _, err = obj.PutObject(bucket, object, length, bytes.NewReader(data), nil, "") + if err != nil { + t.Fatal(err) + } + + // Remove the object - to simulate the case where the disk was down when the object was created. + err = os.RemoveAll(path.Join(testServer.Disks[0], bucket, object)) + if err != nil { + t.Fatal(err) + } + + args = []string{"./minio", "control", "heal", url + "/bucket"} + // run app + err = app.Run(args) + if err != nil { + t.Errorf("Control-Heal-Bucket-Main test failed with - %s", err.Error()) + } + + args = []string{"./minio", "control", "heal", url + "/bucket/object"} + // run app + err = app.Run(args) + if err != nil { + t.Errorf("Control-Heal-Bucket-With-Prefix-Main test failed with - %s", err.Error()) } } diff --git a/cmd/control-router.go b/cmd/control-router.go index ce2f4459f..748254b8a 100644 --- a/cmd/control-router.go +++ b/cmd/control-router.go @@ -73,9 +73,10 @@ func initRemoteControlClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient { // operations on server. type controlAPIHandlers struct { ObjectAPI func() ObjectLayer - StorageDisks []StorageAPI + IsXL bool RemoteControls []*AuthRPCClient LocalNode string + StorageDisks []StorageAPI } // Register control RPC handlers. @@ -83,6 +84,7 @@ func registerControlRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig) // Initialize Control. ctrlHandlers := &controlAPIHandlers{ ObjectAPI: newObjectLayerFn, + IsXL: srvCmdConfig.isDistXL || len(srvCmdConfig.storageDisks) > 1, RemoteControls: initRemoteControlClients(srvCmdConfig), LocalNode: getLocalAddress(srvCmdConfig), StorageDisks: srvCmdConfig.storageDisks, diff --git a/cmd/control_test.go b/cmd/control_test.go index 652688c2c..904a0ce3a 100644 --- a/cmd/control_test.go +++ b/cmd/control_test.go @@ -270,77 +270,91 @@ func (s *TestRPCControlSuite) testRPCControlLock(c *testing.T) { } func TestControlHealDiskMetadataH(t *testing.T) { - //setup code + // Setup code s := &TestRPCControlSuite{serverType: "XL"} s.SetUpSuite(t) - //run test - s.testControlHealDiskMetadataH(t) + // Run test + s.testControlHealFormatH(t) - //teardown code + // Teardown code s.TearDownSuite(t) } -// TestControlHandlerHealDiskMetadata - Registers and call the `HealDiskMetadataHandler`, asserts to validate the success. -func (s *TestRPCControlSuite) testControlHealDiskMetadataH(c *testing.T) { +// TestControlHandlerHealFormat - Registers and call the `HealFormatHandler`, asserts to validate the success. +func (s *TestRPCControlSuite) testControlHealFormatH(c *testing.T) { // The suite has already started the test RPC server, just send RPC calls. client := newAuthClient(s.testAuthConf) defer client.Close() args := &GenericArgs{} reply := &GenericReply{} - err := client.Call("Control.HealDiskMetadataHandler", args, reply) + err := client.Call("Control.HealFormatHandler", args, reply) if err != nil { - c.Errorf("Control.HealDiskMetadataH - test failed with %s", err) + c.Errorf("Test failed with %s", err) } } func TestControlHealObjectH(t *testing.T) { - //setup code + // Setup code s := &TestRPCControlSuite{serverType: "XL"} s.SetUpSuite(t) - //run test - s.testControlHealObjectH(t) + // Run test + s.testControlHealObjectsH(t) - //teardown code + // Teardown code s.TearDownSuite(t) } -func (s *TestRPCControlSuite) testControlHealObjectH(t *testing.T) { +func (s *TestRPCControlSuite) testControlHealObjectsH(t *testing.T) { client := newAuthClient(s.testAuthConf) defer client.Close() - err := newObjectLayerFn().MakeBucket("testbucket") + objAPI := newObjectLayerFn() + + err := objAPI.MakeBucket("testbucket") if err != nil { - t.Fatalf( - "Control.HealObjectH - create bucket failed with %s", err) + t.Fatalf("Create bucket failed with %s", err) } datum := strings.NewReader("a") - _, err = newObjectLayerFn().PutObject("testbucket", "testobject", 1, datum, nil, "") + _, err = objAPI.PutObject("testbucket", "testobject1", 1, datum, nil, "") if err != nil { - t.Fatalf("Control.HealObjectH - put object failed with %s", err) + t.Fatalf("Put object failed with %s", err) + } + datum = strings.NewReader("a") + _, err = objAPI.PutObject("testbucket", "testobject2", 1, datum, nil, "") + if err != nil { + t.Fatalf("Put object failed with %s", err) } - args := &HealObjectArgs{GenericArgs{}, "testbucket", "testobject"} - reply := &GenericReply{} - err = client.Call("Control.HealObjectHandler", args, reply) - + args := &HealObjectArgs{ + Bucket: "testbucket", + Objects: []ObjectInfo{ + { + Name: "testobject1", + }, { + Name: "testobject2", + }, + }, + } + reply := &HealObjectReply{} + err = client.Call("Control.HealObjectsHandler", args, reply) if err != nil { - t.Errorf("Control.HealObjectH - test failed with %s", err) + t.Errorf("Test failed with %s", err) } } func TestControlListObjectsHealH(t *testing.T) { - //setup code + // Setup code s := &TestRPCControlSuite{serverType: "XL"} s.SetUpSuite(t) - //run test + // Run test s.testControlListObjectsHealH(t) - //teardown code + // Teardown code s.TearDownSuite(t) } @@ -348,17 +362,18 @@ func (s *TestRPCControlSuite) testControlListObjectsHealH(t *testing.T) { client := newAuthClient(s.testAuthConf) defer client.Close() - // careate a bucket - err := newObjectLayerFn().MakeBucket("testbucket") + objAPI := newObjectLayerFn() + + // Create a bucket + err := objAPI.MakeBucket("testbucket") if err != nil { - t.Fatalf( - "Control.ListObjectsHealH - create bucket failed - %s", err) + t.Fatalf("Create bucket failed - %s", err) } r := strings.NewReader("0") - _, err = newObjectLayerFn().PutObject("testbucket", "testObj-0", 1, r, nil, "") + _, err = objAPI.PutObject("testbucket", "testObj-0", 1, r, nil, "") if err != nil { - t.Fatalf("Control.ListObjectsHealH - object creation failed - %s", err) + t.Fatalf("Object creation failed - %s", err) } args := &HealListArgs{ @@ -367,8 +382,7 @@ func (s *TestRPCControlSuite) testControlListObjectsHealH(t *testing.T) { } reply := &GenericReply{} err = client.Call("Control.ListObjectsHealHandler", args, reply) - if err != nil { - t.Errorf("Control.ListObjectsHealHandler - test failed - %s", err) + t.Errorf("Test failed - %s", err) } } diff --git a/cmd/errors.go b/cmd/errors.go index b6fbf1984..fea32c10c 100644 --- a/cmd/errors.go +++ b/cmd/errors.go @@ -111,12 +111,12 @@ func errorCause(err error) error { // Returns slice of underlying cause error. func errorsCause(errs []error) []error { - Errs := make([]error, len(errs)) + cerrs := make([]error, len(errs)) for i, err := range errs { if err == nil { continue } - Errs[i] = errorCause(err) + cerrs[i] = errorCause(err) } - return Errs + return cerrs } diff --git a/cmd/format-config-v1.go b/cmd/format-config-v1.go index 81433d920..65b2a49f5 100644 --- a/cmd/format-config-v1.go +++ b/cmd/format-config-v1.go @@ -743,9 +743,10 @@ func healFormatXLCorruptedDisks(storageDisks []StorageAPI) error { // loadFormatXL - loads XL `format.json` and returns back properly // ordered storage slice based on `format.json`. -func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) { +func loadFormatXL(bootstrapDisks []StorageAPI, readQuorum int) (disks []StorageAPI, err error) { var unformattedDisksFoundCnt = 0 var diskNotFoundCount = 0 + var corruptedDisksFoundCnt = 0 formatConfigs := make([]*formatConfigV1, len(bootstrapDisks)) // Try to load `format.json` bootstrap disks. @@ -763,6 +764,9 @@ func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) { } else if err == errDiskNotFound { diskNotFoundCount++ continue + } else if err == errCorruptedFormat { + corruptedDisksFoundCnt++ + continue } return nil, err } @@ -771,11 +775,13 @@ func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) { } // If all disks indicate that 'format.json' is not available return 'errUnformattedDisk'. - if unformattedDisksFoundCnt > len(bootstrapDisks)-(len(bootstrapDisks)/2+1) { + if unformattedDisksFoundCnt > len(bootstrapDisks)-readQuorum { return nil, errUnformattedDisk + } else if corruptedDisksFoundCnt > len(bootstrapDisks)-readQuorum { + return nil, errCorruptedFormat } else if diskNotFoundCount == len(bootstrapDisks) { return nil, errDiskNotFound - } else if diskNotFoundCount > len(bootstrapDisks)-(len(bootstrapDisks)/2+1) { + } else if diskNotFoundCount > len(bootstrapDisks)-readQuorum { return nil, errXLReadQuorum } diff --git a/cmd/format-config-v1_test.go b/cmd/format-config-v1_test.go index 98556cafc..b1cfbf482 100644 --- a/cmd/format-config-v1_test.go +++ b/cmd/format-config-v1_test.go @@ -292,7 +292,7 @@ func TestFormatXLHealFreshDisks(t *testing.T) { } // Load again XL format.json to validate it - _, err = loadFormatXL(storageDisks) + _, err = loadFormatXL(storageDisks, 8) if err != nil { t.Fatal("loading healed disk failed: ", err) } @@ -322,7 +322,7 @@ func TestFormatXLHealFreshDisksErrorExpected(t *testing.T) { prepareNOfflineDisks(storageDisks, 16, t) // Load again XL format.json to validate it - _, err = loadFormatXL(storageDisks) + _, err = loadFormatXL(storageDisks, 8) if err == nil { t.Fatal("loading format disk error") } @@ -401,7 +401,7 @@ func TestFormatXLHealCorruptedDisks(t *testing.T) { } // Load again XL format.json to validate it - _, err = loadFormatXL(permutedStorageDisks) + _, err = loadFormatXL(permutedStorageDisks, 8) if err != nil { t.Fatal("loading healed disk failed: ", err) } @@ -709,7 +709,7 @@ func TestLoadFormatXLErrs(t *testing.T) { t.Fatal("storage disk is not *posix type") } xl.storageDisks[10] = newNaughtyDisk(posixDisk, nil, errFaultyDisk) - if _, err = loadFormatXL(xl.storageDisks); err != errFaultyDisk { + if _, err = loadFormatXL(xl.storageDisks, 8); err != errFaultyDisk { t.Fatal("Got an unexpected error: ", err) } @@ -735,7 +735,7 @@ func TestLoadFormatXLErrs(t *testing.T) { } xl.storageDisks[i] = newNaughtyDisk(posixDisk, nil, errDiskNotFound) } - if _, err = loadFormatXL(xl.storageDisks); err != errXLReadQuorum { + if _, err = loadFormatXL(xl.storageDisks, 8); err != errXLReadQuorum { t.Fatal("Got an unexpected error: ", err) } @@ -757,7 +757,7 @@ func TestLoadFormatXLErrs(t *testing.T) { t.Fatal(err) } } - if _, err = loadFormatXL(xl.storageDisks); err != errUnformattedDisk { + if _, err = loadFormatXL(xl.storageDisks, 8); err != errUnformattedDisk { t.Fatal("Got an unexpected error: ", err) } @@ -777,7 +777,7 @@ func TestLoadFormatXLErrs(t *testing.T) { for i := 0; i < 16; i++ { xl.storageDisks[i] = nil } - if _, err := loadFormatXL(xl.storageDisks); err != errDiskNotFound { + if _, err := loadFormatXL(xl.storageDisks, 8); err != errDiskNotFound { t.Fatal("Got an unexpected error: ", err) } } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 3af03292e..e0cc5064c 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -643,7 +643,12 @@ func (fs fsObjects) HealObject(bucket, object string) error { return traceError(NotImplemented{}) } -// HealListObjects - list objects for healing. Valid only for XL +// HealBucket - no-op for fs, Valid only for XL. +func (fs fsObjects) HealBucket(bucket string) error { + return traceError(NotImplemented{}) +} + +// ListObjectsHeal - list all objects to be healed. Valid only for XL func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { return ListObjectsInfo{}, traceError(NotImplemented{}) } diff --git a/cmd/globals.go b/cmd/globals.go index ce3ed67cd..2e03a0680 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -76,6 +76,8 @@ var ( // global colors. var ( - colorBlue = color.New(color.FgBlue).SprintfFunc() - colorBold = color.New(color.Bold).SprintFunc() + colorRed = color.New(color.FgRed).SprintFunc() + colorBold = color.New(color.Bold).SprintFunc() + colorBlue = color.New(color.FgBlue).SprintfFunc() + colorGreen = color.New(color.FgGreen).SprintfFunc() ) diff --git a/cmd/object-interface.go b/cmd/object-interface.go index cae20833e..2c3b1ed7a 100644 --- a/cmd/object-interface.go +++ b/cmd/object-interface.go @@ -30,14 +30,12 @@ type ObjectLayer interface { ListBuckets() (buckets []BucketInfo, err error) DeleteBucket(bucket string) error ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) - ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) // Object operations. GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error) GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (objInto ObjectInfo, err error) DeleteObject(bucket, object string) error - HealObject(bucket, object string) error // Multipart operations. ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) @@ -46,4 +44,9 @@ type ObjectLayer interface { ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error) AbortMultipartUpload(bucket, object, uploadID string) error CompleteMultipartUpload(bucket, object, uploadID string, uploadedParts []completePart) (md5 string, err error) + + // Healing operations. + HealBucket(bucket string) error + HealObject(bucket, object string) error + ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) } diff --git a/cmd/scan-bar.go b/cmd/scan-bar.go new file mode 100644 index 000000000..fbfa5ad75 --- /dev/null +++ b/cmd/scan-bar.go @@ -0,0 +1,92 @@ +/* + * Minio Cloud Storage (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "runtime" + "strings" + + "github.com/cheggaaa/pb" + "github.com/dustin/go-humanize" + "github.com/minio/mc/pkg/console" +) + +// fixateScanBar truncates or stretches text to fit within the terminal size. +func fixateScanBar(text string, width int) string { + if len([]rune(text)) > width { + // Trim text to fit within the screen + trimSize := len([]rune(text)) - width + 3 //"..." + if trimSize < len([]rune(text)) { + text = "..." + text[trimSize:] + } + } else { + text += strings.Repeat(" ", width-len([]rune(text))) + } + return text +} + +// Progress bar function report objects being scaned. +type scanBarFunc func(string) + +// scanBarFactory returns a progress bar function to report URL scanning. +func scanBarFactory() scanBarFunc { + fileCount := 0 + termWidth, err := pb.GetTerminalWidth() + if err != nil { + termWidth = 80 + } + + // Cursor animate channel. + cursorCh := cursorAnimate() + return func(source string) { + scanPrefix := fmt.Sprintf("[%s] %s ", humanize.Comma(int64(fileCount)), string(<-cursorCh)) + source = fixateScanBar(source, termWidth-len([]rune(scanPrefix))) + barText := scanPrefix + source + console.PrintC("\r" + barText + "\r") + fileCount++ + } +} + +// cursorAnimate - returns a animated rune through read channel for every read. +func cursorAnimate() <-chan rune { + cursorCh := make(chan rune) + var cursors string + + switch runtime.GOOS { + case "linux": + // cursors = "➩➪➫➬➭➮➯➱" + // cursors = "▁▃▄▅▆▇█▇▆▅▄▃" + cursors = "◐◓◑◒" + // cursors = "←↖↑↗→↘↓↙" + // cursors = "◴◷◶◵" + // cursors = "◰◳◲◱" + //cursors = "⣾⣽⣻⢿⡿⣟⣯⣷" + case "darwin": + cursors = "◐◓◑◒" + default: + cursors = "|/-\\" + } + go func() { + for { + for _, cursor := range cursors { + cursorCh <- cursor + } + } + }() + return cursorCh +} diff --git a/cmd/utils.go b/cmd/utils.go index e5ca67cc2..63eb0ad8d 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -163,6 +163,19 @@ func contains(stringList []string, element string) bool { return false } +// urlPathSplit - split url path into bucket and object components. +func urlPathSplit(urlPath string) (bucketName, prefixName string) { + if urlPath == "" { + return urlPath, "" + } + urlPath = strings.TrimPrefix(urlPath, "/") + i := strings.Index(urlPath, "/") + if i != -1 { + return urlPath[:i], urlPath[i+1:] + } + return urlPath, "" +} + // Starts a profiler returns nil if profiler is not enabled, caller needs to handle this. func startProfiler(profiler string) interface { Stop() diff --git a/cmd/utils_test.go b/cmd/utils_test.go index 3a9ecd45c..9fd797978 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -124,6 +124,48 @@ func TestMaxObjectSize(t *testing.T) { } } +// Test urlPathSplit. +func TestURLPathSplit(t *testing.T) { + type test struct { + urlPath string + bucketName string + prefixName string + } + + testCases := []test{ + { + urlPath: "/b/c/", + bucketName: "b", + prefixName: "c/", + }, + { + urlPath: "c/aa", + bucketName: "c", + prefixName: "aa", + }, + { + urlPath: "", + bucketName: "", + prefixName: "", + }, + { + urlPath: "/b", + bucketName: "b", + prefixName: "", + }, + } + + for i, testCase := range testCases { + bucketName, prefixName := urlPathSplit(testCase.urlPath) + if bucketName != testCase.bucketName { + t.Errorf("Tets %d: Expected %s, %s", i+1, testCase.bucketName, bucketName) + } + if prefixName != testCase.prefixName { + t.Errorf("Tets %d: Expected %s, %s", i+1, testCase.bucketName, bucketName) + } + } +} + // Tests minimum allowed part size. func TestMinAllowedPartSize(t *testing.T) { sizes := []struct { diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index b660c1296..45cb33851 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -305,27 +305,3 @@ func (xl xlObjects) DeleteBucket(bucket string) error { // Success. return nil } - -// Heal bucket - create buckets on disks where it does not exist. -func healBucket(disks []StorageAPI, bucket string) error { - bucketFound := false - for _, disk := range disks { - _, err := disk.StatVol(bucket) - if err == nil { - bucketFound = true - } - } - if !bucketFound { - return traceError(errVolumeNotFound) - } - for _, disk := range disks { - err := disk.MakeVol(bucket) - if err == nil { - continue - } - if err != errVolumeExists { - return traceError(err) - } - } - return nil -} diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index a273e7920..cb5e3a8b6 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -17,55 +17,59 @@ package cmd import ( - "path" "sort" "strings" ) -func listDirHealFactory(disks ...StorageAPI) listDirFunc { +func listDirHealFactory(isLeaf isLeafFunc, disks ...StorageAPI) listDirFunc { // Returns sorted merged entries from all the disks. - listDir := func(bucket, prefixDir, prefixEntry string) (mergedentries []string, delayIsLeaf bool, err error) { + listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) { for _, disk := range disks { + if disk == nil { + continue + } var entries []string var newEntries []string entries, err = disk.ListDir(bucket, prefixDir) if err != nil { - // Skip the disk of listDir returns error. continue } + // Listing needs to be sorted. + sort.Strings(entries) + + // Filter entries that have the prefix prefixEntry. + entries = filterMatchingPrefix(entries, prefixEntry) + // isLeaf() check has to happen here so that trailing "/" for objects can be removed. for i, entry := range entries { - if strings.HasSuffix(entry, slashSeparator) { - if _, err = disk.StatFile(bucket, path.Join(prefixDir, entry, xlMetaJSONFile)); err == nil { - // If it is an object trim the trailing "/" - entries[i] = strings.TrimSuffix(entry, slashSeparator) - } + if isLeaf(bucket, pathJoin(prefixDir, entry)) { + entries[i] = strings.TrimSuffix(entry, slashSeparator) } } - - if len(mergedentries) == 0 { + // Sort again after removing trailing "/" for objects as the previous sort + // does not hold good anymore. + sort.Strings(entries) + if len(mergedEntries) == 0 { // For the first successful disk.ListDir() - mergedentries = entries - sort.Strings(mergedentries) + mergedEntries = entries + sort.Strings(mergedEntries) continue } - // find elements in entries which are not in mergedentries for _, entry := range entries { - idx := sort.SearchStrings(mergedentries, entry) - if mergedentries[idx] == entry { + idx := sort.SearchStrings(mergedEntries, entry) + if mergedEntries[idx] == entry { continue } newEntries = append(newEntries, entry) } - if len(newEntries) > 0 { // Merge the entries and sort it. - mergedentries = append(mergedentries, newEntries...) - sort.Strings(mergedentries) + mergedEntries = append(mergedEntries, newEntries...) + sort.Strings(mergedEntries) } } - return mergedentries, false, nil + return mergedEntries, false, nil } return listDir } @@ -83,7 +87,8 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, heal}) if walkResultCh == nil { endWalkCh = make(chan struct{}) - listDir := listDirHealFactory(xl.storageDisks...) + isLeaf := xl.isObject + listDir := listDirHealFactory(isLeaf, xl.storageDisks...) walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, nil, endWalkCh) } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index ed3171f77..61ac04041 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -218,7 +218,70 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return nil } -// HealObject - heal the object. +func (xl xlObjects) HealBucket(bucket string) error { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return traceError(BucketNameInvalid{Bucket: bucket}) + } + + // Heal bucket - create buckets on disks where it does not exist. + + // get a random ID for lock instrumentation. + opsID := getOpsID() + + nsMutex.Lock(bucket, "", opsID) + defer nsMutex.Unlock(bucket, "", opsID) + + // Initialize sync waitgroup. + var wg = &sync.WaitGroup{} + + // Initialize list of errors. + var dErrs = make([]error, len(xl.storageDisks)) + + // Make a volume entry on all underlying storage disks. + for index, disk := range xl.storageDisks { + if disk == nil { + dErrs[index] = traceError(errDiskNotFound) + continue + } + wg.Add(1) + // Make a volume inside a go-routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + if _, err := disk.StatVol(bucket); err != nil { + if err != errVolumeNotFound { + dErrs[index] = traceError(err) + return + } + if err = disk.MakeVol(bucket); err != nil { + dErrs[index] = traceError(err) + } + } + }(index, disk) + } + + // Wait for all make vol to finish. + wg.Wait() + + // Do we have write quorum?. + if !isDiskQuorum(dErrs, xl.writeQuorum) { + // Purge successfully created buckets if we don't have writeQuorum. + xl.undoMakeBucket(bucket) + return toObjectErr(traceError(errXLWriteQuorum), bucket) + } + + // Verify we have any other errors which should be returned as failure. + if reducedErr := reduceErrs(dErrs, []error{ + errDiskNotFound, + errFaultyDisk, + errDiskAccessDenied, + }); reducedErr != nil { + return toObjectErr(reducedErr, bucket) + } + return nil +} + +// HealObject heals a given object for all its missing entries. // FIXME: If an object object was deleted and one disk was down, and later the disk comes back // up again, heal on the object should delete it. func (xl xlObjects) HealObject(bucket, object string) error { @@ -226,14 +289,9 @@ func (xl xlObjects) HealObject(bucket, object string) error { if !IsValidBucketName(bucket) { return traceError(BucketNameInvalid{Bucket: bucket}) } - if object == "" { - // Empty object name indicates that bucket should be healed. - return healBucket(xl.storageDisks, bucket) - } // Verify if object is valid. if !IsValidObjectName(object) { - // FIXME: return Invalid prefix. return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } @@ -277,8 +335,7 @@ func (xl xlObjects) HealObject(bucket, object string) error { outDatedMeta := partsMetadata[index] // Delete all the parts. for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ { - err := disk.DeleteFile(bucket, - pathJoin(object, outDatedMeta.Parts[partIndex].Name)) + err := disk.DeleteFile(bucket, pathJoin(object, outDatedMeta.Parts[partIndex].Name)) if err != nil { return traceError(err) } @@ -298,8 +355,8 @@ func (xl xlObjects) HealObject(bucket, object string) error { // We write at temporary location and then rename to fianal location. tmpID := getUUID() - // Checksum of the part files. checkSumInfos[index] will contain checksums of all the part files - // in the outDatedDisks[index] + // Checksum of the part files. checkSumInfos[index] will contain checksums + // of all the part files in the outDatedDisks[index] checkSumInfos := make([][]checkSumInfo, len(outDatedDisks)) // Heal each part. erasureHealFile() will write the healed part to @@ -336,6 +393,8 @@ func (xl xlObjects) HealObject(bucket, object string) error { partsMetadata[index] = latestMeta partsMetadata[index].Erasure.Checksum = checkSumInfos[index] } + + // Generate and write `xl.json` generated from other disks. err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks)) if err != nil { return toObjectErr(err, bucket, object) @@ -346,7 +405,13 @@ func (xl xlObjects) HealObject(bucket, object string) error { if disk == nil { continue } - err := disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object)) + // Remove any lingering partial data from current namespace. + err = disk.DeleteFile(bucket, retainSlash(object)) + if err != nil && err != errFileNotFound { + return traceError(err) + } + // Attempt a rename now from healed data to final location. + err = disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object)) if err != nil { return traceError(err) } diff --git a/cmd/xl-v1-object_test.go b/cmd/xl-v1-object_test.go index 60b1dab9a..cb86a3785 100644 --- a/cmd/xl-v1-object_test.go +++ b/cmd/xl-v1-object_test.go @@ -265,7 +265,8 @@ func TestPutObjectNoQuorum(t *testing.T) { removeRoots(fsDirs) } -func TestHealObject(t *testing.T) { +// Tests both object and bucket healing. +func TestHealing(t *testing.T) { obj, fsDirs, err := prepareXL() if err != nil { t.Fatal(err) @@ -346,14 +347,14 @@ func TestHealObject(t *testing.T) { t.Fatal("HealObject failed") } - // Remove the bucket - to simulate the case where bucket was created when the - // disk was down. + // Remove the bucket - to simulate the case where bucket was + // created when the disk was down. err = os.RemoveAll(path.Join(fsDirs[0], bucket)) if err != nil { t.Fatal(err) } // This would create the bucket. - err = xl.HealObject(bucket, "") + err = xl.HealBucket(bucket) if err != nil { t.Fatal(err) } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 34dc85fea..069d662dc 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -75,7 +75,7 @@ var xlTreeWalkIgnoredErrs = []error{ errFaultyDisk, } -func repairDiskMetadata(storageDisks []StorageAPI) error { +func healFormatXL(storageDisks []StorageAPI) error { // Attempt to load all `format.json`. formatConfigs, sErrs := loadAllFormats(storageDisks) @@ -116,14 +116,17 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { return nil, err } + readQuorum := len(storageDisks) / 2 + writeQuorum := len(storageDisks)/2 + 1 + // Load saved XL format.json and validate. - newPosixDisks, err := loadFormatXL(storageDisks) + newStorageDisks, err := loadFormatXL(storageDisks, readQuorum) if err != nil { return nil, fmt.Errorf("Unable to recognize backend format, %s", err) } // Calculate data and parity blocks. - dataBlocks, parityBlocks := len(newPosixDisks)/2, len(newPosixDisks)/2 + dataBlocks, parityBlocks := len(newStorageDisks)/2, len(newStorageDisks)/2 // Initialize object cache. objCache := objcache.New(globalMaxCacheSize, globalCacheExpiry) @@ -133,7 +136,7 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { // Initialize xl objects. xl := xlObjects{ - storageDisks: newPosixDisks, + storageDisks: newStorageDisks, dataBlocks: dataBlocks, parityBlocks: parityBlocks, listPool: listPool, @@ -143,8 +146,8 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { // Figure out read and write quorum based on number of storage disks. // READ and WRITE quorum is always set to (N/2) number of disks. - xl.readQuorum = len(xl.storageDisks) / 2 - xl.writeQuorum = len(xl.storageDisks)/2 + 1 + xl.readQuorum = readQuorum + xl.writeQuorum = writeQuorum // Return successfully initialized object layer. return xl, nil @@ -156,16 +159,6 @@ func (xl xlObjects) Shutdown() error { return nil } -// HealDiskMetadata function for object storage interface. -func (xl xlObjects) HealDiskMetadata() error { - // get a random ID for lock instrumentation. - opsID := getOpsID() - - nsMutex.Lock(minioMetaBucket, formatConfigFile, opsID) - defer nsMutex.Unlock(minioMetaBucket, formatConfigFile, opsID) - return repairDiskMetadata(xl.storageDisks) -} - // byDiskTotal is a collection satisfying sort.Interface. type byDiskTotal []disk.Info