From ae936a01474922f19169c231023fa063fd75f1ae Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 29 Jun 2016 02:10:40 -0700 Subject: [PATCH] XL: Relax write quorum further to N/2 + 1. (#2018) This changes behavior in some parts of the code as well address it. Fixes #2016 --- fs-v1-metadata.go | 2 +- fs-v1-multipart-common.go | 57 ++++++++++ fs-v1-multipart.go | 6 +- object-api-multipart_test.go | 2 +- object-multipart-common.go | 166 +++++++++++++++++++++++++++ xl-v1-metadata.go | 4 +- xl-v1-multipart-common.go | 210 +++-------------------------------- xl-v1-multipart.go | 6 +- xl-v1.go | 11 +- 9 files changed, 249 insertions(+), 215 deletions(-) create mode 100644 object-multipart-common.go diff --git a/fs-v1-metadata.go b/fs-v1-metadata.go index d8b799ca5..7399b446b 100644 --- a/fs-v1-metadata.go +++ b/fs-v1-metadata.go @@ -76,7 +76,7 @@ func readFSMetadata(disk StorageAPI, bucket, object string) (fsMeta fsMetaV1, er // newFSMetaV1 - initializes new fsMetaV1. func newFSMetaV1() (fsMeta fsMetaV1) { fsMeta = fsMetaV1{} - fsMeta.Version = "1" + fsMeta.Version = "1.0.0" fsMeta.Format = "fs" fsMeta.Minio.Release = minioReleaseTag return fsMeta diff --git a/fs-v1-multipart-common.go b/fs-v1-multipart-common.go index 96ecea0bf..154fe1b72 100644 --- a/fs-v1-multipart-common.go +++ b/fs-v1-multipart-common.go @@ -17,8 +17,10 @@ package main import ( + "encoding/json" "path" "strings" + "time" ) // Returns if the prefix is a multipart upload. @@ -68,3 +70,58 @@ func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool { } return true } + +// writeUploadJSON - create `uploads.json` or update it with new uploadID. +func (fs fsObjects) writeUploadJSON(bucket, object, uploadID string, initiated time.Time) (err error) { + uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) + uniqueID := getUUID() + tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) + var uploadsJSON uploadsV1 + uploadsJSON, err = readUploadsJSON(bucket, object, fs.storage) + if err != nil { + // For any other errors. + if err != errFileNotFound { + return err + } + // Set uploads format to `fs`. + uploadsJSON = newUploadsV1("fs") + } + // Add a new upload id. + uploadsJSON.AddUploadID(uploadID, initiated) + + // Update `uploads.json` on all disks. + uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON) + if wErr != nil { + return wErr + } + // Write `uploads.json` to disk. + if wErr = fs.storage.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil { + return wErr + } + wErr = fs.storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath) + if wErr != nil { + if dErr := fs.storage.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil { + return dErr + } + return wErr + } + return nil +} + +// updateUploadsJSON - update `uploads.json` with new uploadsJSON for all disks. +func (fs fsObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1) (err error) { + uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) + uniqueID := getUUID() + tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) + uploadsBytes, wErr := json.Marshal(uploadsJSON) + if wErr != nil { + return wErr + } + if wErr = fs.storage.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil { + return wErr + } + if wErr = fs.storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil { + return wErr + } + return nil +} diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index 6d9fdf796..429ab3072 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -222,7 +222,7 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st uploadID = getUUID() initiated := time.Now().UTC() // Create 'uploads.json' - if err = writeUploadJSON(bucket, object, uploadID, initiated, fs.storage); err != nil { + if err = fs.writeUploadJSON(bucket, object, uploadID, initiated); err != nil { return "", err } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) @@ -573,7 +573,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) } if len(uploadsJSON.Uploads) > 0 { - if err = updateUploadsJSON(bucket, object, uploadsJSON, fs.storage); err != nil { + if err = fs.updateUploadsJSON(bucket, object, uploadsJSON); err != nil { return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) } // Return success. @@ -609,7 +609,7 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error // There are pending uploads for the same object, preserve // them update 'uploads.json' in-place. if len(uploadsJSON.Uploads) > 0 { - err = updateUploadsJSON(bucket, object, uploadsJSON, fs.storage) + err = fs.updateUploadsJSON(bucket, object, uploadsJSON) if err != nil { return toObjectErr(err, bucket, object) } diff --git a/object-api-multipart_test.go b/object-api-multipart_test.go index 303bf8151..9086eef07 100644 --- a/object-api-multipart_test.go +++ b/object-api-multipart_test.go @@ -129,7 +129,7 @@ func testPutObjectPartDiskNotFound(obj ObjectLayer, instanceType string, disks [ } // Remove some random disk. - for _, disk := range disks[:6] { + for _, disk := range disks[:7] { removeAll(disk) } diff --git a/object-multipart-common.go b/object-multipart-common.go new file mode 100644 index 000000000..a66fe5a52 --- /dev/null +++ b/object-multipart-common.go @@ -0,0 +1,166 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "encoding/json" + "path" + "sort" + "sync" + "time" +) + +// A uploadInfo represents the s3 compatible spec. +type uploadInfo struct { + UploadID string `json:"uploadId"` // UploadID for the active multipart upload. + Deleted bool `json:"deleted"` // Currently unused, for future use. + Initiated time.Time `json:"initiated"` // Indicates when the uploadID was initiated. +} + +// A uploadsV1 represents `uploads.json` metadata header. +type uploadsV1 struct { + Version string `json:"version"` // Version of the current `uploads.json` + Format string `json:"format"` // Format of the current `uploads.json` + Uploads []uploadInfo `json:"uploadIds"` // Captures all the upload ids for a given object. +} + +// byInitiatedTime is a collection satisfying sort.Interface. +type byInitiatedTime []uploadInfo + +func (t byInitiatedTime) Len() int { return len(t) } +func (t byInitiatedTime) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t byInitiatedTime) Less(i, j int) bool { + return t[i].Initiated.Before(t[j].Initiated) +} + +// AddUploadID - adds a new upload id in order of its initiated time. +func (u *uploadsV1) AddUploadID(uploadID string, initiated time.Time) { + u.Uploads = append(u.Uploads, uploadInfo{ + UploadID: uploadID, + Initiated: initiated, + }) + sort.Sort(byInitiatedTime(u.Uploads)) +} + +// Index - returns the index of matching the upload id. +func (u uploadsV1) Index(uploadID string) int { + for i, u := range u.Uploads { + if u.UploadID == uploadID { + return i + } + } + return -1 +} + +// readUploadsJSON - get all the saved uploads JSON. +func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV1, err error) { + uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) + // Reads entire `uploads.json`. + buf, err := disk.ReadAll(minioMetaBucket, uploadJSONPath) + if err != nil { + return uploadsV1{}, err + } + + // Decode `uploads.json`. + if err = json.Unmarshal(buf, &uploadIDs); err != nil { + return uploadsV1{}, err + } + + // Success. + return uploadIDs, nil +} + +// newUploadsV1 - initialize new uploads v1. +func newUploadsV1(format string) uploadsV1 { + uploadIDs := uploadsV1{} + uploadIDs.Version = "1.0.0" // Should follow semantic versioning. + uploadIDs.Format = format + return uploadIDs +} + +// Wrapper which removes all the uploaded parts. +func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...StorageAPI) error { + var errs = make([]error, len(storageDisks)) + var wg = &sync.WaitGroup{} + + // Construct uploadIDPath. + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + + // Cleanup uploadID for all disks. + for index, disk := range storageDisks { + if disk == nil { + errs[index] = errDiskNotFound + continue + } + wg.Add(1) + // Cleanup each uploadID in a routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + err := cleanupDir(disk, minioMetaBucket, uploadIDPath) + if err != nil { + errs[index] = err + return + } + errs[index] = nil + }(index, disk) + } + + // Wait for all the cleanups to finish. + wg.Wait() + + // Return first error. + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +// listMultipartUploadIDs - list all the upload ids from a marker up to 'count'. +func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int, disk StorageAPI) ([]uploadMetadata, bool, error) { + var uploads []uploadMetadata + // Read `uploads.json`. + uploadsJSON, err := readUploadsJSON(bucketName, objectName, disk) + if err != nil { + return nil, false, err + } + index := 0 + if uploadIDMarker != "" { + for ; index < len(uploadsJSON.Uploads); index++ { + if uploadsJSON.Uploads[index].UploadID == uploadIDMarker { + // Skip the uploadID as it would already be listed in previous listing. + index++ + break + } + } + } + for index < len(uploadsJSON.Uploads) { + uploads = append(uploads, uploadMetadata{ + Object: objectName, + UploadID: uploadsJSON.Uploads[index].UploadID, + Initiated: uploadsJSON.Uploads[index].Initiated, + }) + count-- + index++ + if count == 0 { + break + } + } + end := (index == len(uploadsJSON.Uploads)) + return uploads, end, nil +} diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index a061bc0ef..9a48d773a 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -111,7 +111,7 @@ type xlMetaV1 struct { // fresh erasure info. func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) { xlMeta = xlMetaV1{} - xlMeta.Version = "1" + xlMeta.Version = "1.0.0" xlMeta.Format = "xl" xlMeta.Minio.Release = minioReleaseTag xlMeta.Erasure = erasureInfo{ @@ -127,7 +127,7 @@ func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) { // IsValid - tells if the format is sane by validating the version // string and format style. func (m xlMetaV1) IsValid() bool { - return m.Version == "1" && m.Format == "xl" + return m.Version == "1.0.0" && m.Format == "xl" } // ObjectPartIndex - returns the index of matching object part number. diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index b5e5cdb74..e36b34a84 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -19,82 +19,21 @@ package main import ( "encoding/json" "path" - "sort" "strings" "sync" "time" ) -// A uploadInfo represents the s3 compatible spec. -type uploadInfo struct { - UploadID string `json:"uploadId"` // UploadID for the active multipart upload. - Deleted bool `json:"deleted"` // Currently unused, for future use. - Initiated time.Time `json:"initiated"` // Indicates when the uploadID was initiated. -} - -// A uploadsV1 represents `uploads.json` metadata header. -type uploadsV1 struct { - Version string `json:"version"` // Version of the current `uploads.json` - Format string `json:"format"` // Format of the current `uploads.json` - Uploads []uploadInfo `json:"uploadIds"` // Captures all the upload ids for a given object. -} - -// byInitiatedTime is a collection satisfying sort.Interface. -type byInitiatedTime []uploadInfo - -func (t byInitiatedTime) Len() int { return len(t) } -func (t byInitiatedTime) Swap(i, j int) { t[i], t[j] = t[j], t[i] } -func (t byInitiatedTime) Less(i, j int) bool { - return t[i].Initiated.Before(t[j].Initiated) -} - -// AddUploadID - adds a new upload id in order of its initiated time. -func (u *uploadsV1) AddUploadID(uploadID string, initiated time.Time) { - u.Uploads = append(u.Uploads, uploadInfo{ - UploadID: uploadID, - Initiated: initiated, - }) - sort.Sort(byInitiatedTime(u.Uploads)) -} - -// Index - returns the index of matching the upload id. -func (u uploadsV1) Index(uploadID string) int { - for i, u := range u.Uploads { - if u.UploadID == uploadID { - return i - } - } - return -1 -} - -// readUploadsJSON - get all the saved uploads JSON. -func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV1, err error) { - uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) - // Reads entire `uploads.json`. - buf, err := disk.ReadAll(minioMetaBucket, uploadJSONPath) - if err != nil { - return uploadsV1{}, err - } - - // Decode `uploads.json`. - if err = json.Unmarshal(buf, &uploadIDs); err != nil { - return uploadsV1{}, err - } - - // Success. - return uploadIDs, nil -} - // updateUploadsJSON - update `uploads.json` with new uploadsJSON for all disks. -func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisks ...StorageAPI) error { +func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1) (err error) { uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uniqueID := getUUID() tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) - var errs = make([]error, len(storageDisks)) + var errs = make([]error, len(xl.storageDisks)) var wg = &sync.WaitGroup{} // Update `uploads.json` for all the disks. - for index, disk := range storageDisks { + for index, disk := range xl.storageDisks { if disk == nil { errs[index] = errDiskNotFound continue @@ -122,25 +61,10 @@ func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisk // Wait for all the routines to finish updating `uploads.json` wg.Wait() - // For only single disk return first error. - if len(storageDisks) == 1 { - return errs[0] - } // else count all the errors for quorum validation. - var errCount = 0 - // Return for first error. - for _, err := range errs { - if err != nil { - errCount++ - } - } // Count all the errors and validate if we have write quorum. - if errCount > len(storageDisks)-len(storageDisks)/2+3 { - // Validate if we have read quorum return success. - if errCount > len(storageDisks)-len(storageDisks)/2+1 { - return nil - } + if !isQuorum(errs, xl.writeQuorum) { // Rename `uploads.json` left over back to tmp location. - for index, disk := range storageDisks { + for index, disk := range xl.storageDisks { if disk == nil { continue } @@ -160,25 +84,17 @@ func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisk return nil } -// newUploadsV1 - initialize new uploads v1. -func newUploadsV1(format string) uploadsV1 { - uploadIDs := uploadsV1{} - uploadIDs.Version = "1" - uploadIDs.Format = format - return uploadIDs -} - // writeUploadJSON - create `uploads.json` or update it with new uploadID. -func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, storageDisks ...StorageAPI) (err error) { +func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated time.Time) (err error) { uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uniqueID := getUUID() tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) - var errs = make([]error, len(storageDisks)) + var errs = make([]error, len(xl.storageDisks)) var wg = &sync.WaitGroup{} var uploadsJSON uploadsV1 - for _, disk := range storageDisks { + for _, disk := range xl.storageDisks { if disk == nil { continue } @@ -190,19 +106,14 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora if err != errFileNotFound { return err } - if len(storageDisks) == 1 { - // Set uploads format to `fs` for single disk. - uploadsJSON = newUploadsV1("fs") - } else { - // Set uploads format to `xl` otherwise. - uploadsJSON = newUploadsV1("xl") - } + // Set uploads format to `xl` otherwise. + uploadsJSON = newUploadsV1("xl") } // Add a new upload id. uploadsJSON.AddUploadID(uploadID, initiated) // Update `uploads.json` on all disks. - for index, disk := range storageDisks { + for index, disk := range xl.storageDisks { if disk == nil { errs[index] = errDiskNotFound continue @@ -234,28 +145,13 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora }(index, disk) } - // Wait for all the writes to finish. + // Wait here for all the writes to finish. wg.Wait() - // For only single disk return first error. - if len(storageDisks) == 1 { - return errs[0] - } // else count all the errors for quorum validation. - var errCount = 0 - // Return for first error. - for _, err := range errs { - if err != nil { - errCount++ - } - } // Count all the errors and validate if we have write quorum. - if errCount > len(storageDisks)-len(storageDisks)/2+3 { - // Validate if we have read quorum return success. - if errCount > len(storageDisks)-len(storageDisks)/2+1 { - return nil - } + if !isQuorum(errs, xl.writeQuorum) { // Rename `uploads.json` left over back to tmp location. - for index, disk := range storageDisks { + for index, disk := range xl.storageDisks { if disk == nil { continue } @@ -275,79 +171,6 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora return nil } -// Wrapper which removes all the uploaded parts. -func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...StorageAPI) error { - var errs = make([]error, len(storageDisks)) - var wg = &sync.WaitGroup{} - - // Construct uploadIDPath. - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - - // Cleanup uploadID for all disks. - for index, disk := range storageDisks { - if disk == nil { - errs[index] = errDiskNotFound - continue - } - wg.Add(1) - // Cleanup each uploadID in a routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - err := cleanupDir(disk, minioMetaBucket, uploadIDPath) - if err != nil { - errs[index] = err - return - } - errs[index] = nil - }(index, disk) - } - - // Wait for all the cleanups to finish. - wg.Wait() - - // Return first error. - for _, err := range errs { - if err != nil { - return err - } - } - return nil -} - -// listMultipartUploadIDs - list all the upload ids from a marker up to 'count'. -func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int, disk StorageAPI) ([]uploadMetadata, bool, error) { - var uploads []uploadMetadata - // Read `uploads.json`. - uploadsJSON, err := readUploadsJSON(bucketName, objectName, disk) - if err != nil { - return nil, false, err - } - index := 0 - if uploadIDMarker != "" { - for ; index < len(uploadsJSON.Uploads); index++ { - if uploadsJSON.Uploads[index].UploadID == uploadIDMarker { - // Skip the uploadID as it would already be listed in previous listing. - index++ - break - } - } - } - for index < len(uploadsJSON.Uploads) { - uploads = append(uploads, uploadMetadata{ - Object: objectName, - UploadID: uploadsJSON.Uploads[index].UploadID, - Initiated: uploadsJSON.Uploads[index].Initiated, - }) - count-- - index++ - if count == 0 { - break - } - } - end := (index == len(uploadsJSON.Uploads)) - return uploads, end, nil -} - // Returns if the prefix is a multipart upload. func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { for _, disk := range xl.getLoadBalancedQuorumDisks() { @@ -476,11 +299,6 @@ func (xl xlObjects) commitXLMetadata(srcPrefix, dstPrefix string) error { // Do we have write quorum?. if !isQuorum(mErrs, xl.writeQuorum) { - // Do we have read quorum?. - if isQuorum(mErrs, xl.readQuorum) { - // Return success on read quorum. - return nil - } return errXLWriteQuorum } // For all other errors return. diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 917b6755a..707c931f4 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -259,7 +259,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st uploadID = getUUID() initiated := time.Now().UTC() // Create 'uploads.json' - if err = writeUploadJSON(bucket, object, uploadID, initiated, xl.storageDisks...); err != nil { + if err = xl.writeUploadJSON(bucket, object, uploadID, initiated); err != nil { return "", err } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) @@ -713,7 +713,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) } if len(uploadsJSON.Uploads) > 0 { - if err = updateUploadsJSON(bucket, object, uploadsJSON, xl.storageDisks...); err != nil { + if err = xl.updateUploadsJSON(bucket, object, uploadsJSON); err != nil { return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) } // Return success. @@ -764,7 +764,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e if len(uploadsJSON.Uploads) > 0 { // There are pending uploads for the same object, preserve // them update 'uploads.json' in-place. - err = updateUploadsJSON(bucket, object, uploadsJSON, xl.storageDisks...) + err = xl.updateUploadsJSON(bucket, object, uploadsJSON) if err != nil { return toObjectErr(err, bucket, object) } diff --git a/xl-v1.go b/xl-v1.go index 50711eaac..b886106e9 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -171,16 +171,9 @@ func newXLObjects(disks []string) (ObjectLayer, error) { } // Figure out read and write quorum based on number of storage disks. - // Read quorum should be always N/2 + 1 (due to Vandermonde matrix - // erasure requirements) + // READ and WRITE quorum is always set to (N/2 + 1) number of disks. xl.readQuorum = len(xl.storageDisks)/2 + 1 - - // Write quorum is assumed if we have total disks + 2 - // parity. - xl.writeQuorum = len(xl.storageDisks)/2 + 2 - if xl.writeQuorum > len(xl.storageDisks) { - xl.writeQuorum = len(xl.storageDisks) - } + xl.writeQuorum = len(xl.storageDisks)/2 + 1 // Return successfully initialized object layer. return xl, nil