diff --git a/cmd/auth-rpc-server_test.go b/cmd/auth-rpc-server_test.go index 0f5aa3416..671fc8bee 100644 --- a/cmd/auth-rpc-server_test.go +++ b/cmd/auth-rpc-server_test.go @@ -52,7 +52,7 @@ func TestLogin(t *testing.T) { { args: LoginRPCArgs{ AuthToken: token, - Version: semVersion{3, 0, 0}, + Version: semVersion{1, 0, 0}, }, skewTime: 0, expectedErr: errRPCAPIVersionUnsupported, diff --git a/cmd/format-xl.go b/cmd/format-xl.go index ee6cc47d8..9541db86a 100644 --- a/cmd/format-xl.go +++ b/cmd/format-xl.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "os" "sync" "encoding/hex" @@ -39,6 +40,9 @@ const ( // formatXLV2.XL.Version - version '2'. formatXLVersionV2 = "2" + // formatXLV3.XL.Version - version '3'. + formatXLVersionV3 = "3" + // Distribution algorithm used. formatXLVersionV2DistributionAlgo = "CRCMOD" ) @@ -98,12 +102,32 @@ type formatXLV2 struct { } `json:"xl"` } +// formatXLV3 struct is same as formatXLV2 struct except that formatXLV3.XL.Version is "3" indicating +// the simplified multipart backend which is a flat hierarchy now. +// In .minio.sys/multipart we have: +// sha256(bucket/object)/uploadID/[xl.json, part.1, part.2 ....] +type formatXLV3 struct { + Version string `json:"version"` + Format string `json:"format"` + XL struct { + Version string `json:"version"` // Version of 'xl' format. + This string `json:"this"` // This field carries assigned disk uuid. + // Sets field carries the input disk order generated the first + // time when fresh disks were supplied, it is a two dimensional + // array second dimension represents list of disks used per set. + Sets [][]string `json:"sets"` + // Distribution algorithm represents the hashing algorithm + // to pick the right set index for an object. + DistributionAlgo string `json:"distributionAlgo"` + } `json:"xl"` +} + // Returns formatXL.XL.Version -func newFormatXLV2(numSets int, setLen int) *formatXLV2 { - format := &formatXLV2{} +func newFormatXLV3(numSets int, setLen int) *formatXLV3 { + format := &formatXLV3{} format.Version = formatMetaVersionV1 format.Format = formatBackendXL - format.XL.Version = formatXLVersionV2 + format.XL.Version = formatXLVersionV3 format.XL.DistributionAlgo = formatXLVersionV2DistributionAlgo format.XL.Sets = make([][]string, numSets) @@ -171,7 +195,12 @@ func formatXLMigrate(export string) error { } fallthrough case formatXLVersionV2: - // V2 is the latest version. + if err = formatXLMigrateV2ToV3(export); err != nil { + return err + } + fallthrough + case formatXLVersionV3: + // format-V3 is the latest verion. return nil } return fmt.Errorf(`%s: unknown format version %s`, export, version) @@ -198,8 +227,13 @@ func formatXLMigrateV1ToV2(export string) error { return err } - formatV2 := newFormatXLV2(1, len(formatV1.XL.JBOD)) - formatV2.XL.This = formatV1.XL.Disk + formatV2 := &formatXLV2{} + formatV2.Version = formatMetaVersionV1 + formatV2.Format = formatBackendXL + formatV2.XL.Version = formatXLVersionV2 + formatV2.XL.DistributionAlgo = formatXLVersionV2DistributionAlgo + formatV2.XL.Sets = make([][]string, 1) + formatV2.XL.Sets[0] = make([]string, len(formatV1.XL.JBOD)) copy(formatV2.XL.Sets[0], formatV1.XL.JBOD) b, err = json.Marshal(formatV2) @@ -209,6 +243,50 @@ func formatXLMigrateV1ToV2(export string) error { return ioutil.WriteFile(formatPath, b, 0644) } +// Migrates V2 for format.json to V3 (Flat hierarchy for multipart) +func formatXLMigrateV2ToV3(export string) error { + formatPath := pathJoin(export, minioMetaBucket, formatConfigFile) + version, err := formatXLGetVersion(formatPath) + if err != nil { + return err + } + if version != formatXLVersionV2 { + return fmt.Errorf(`Disk %s: format version expected %s, found %s`, export, formatXLVersionV2, version) + } + formatV2 := &formatXLV2{} + b, err := ioutil.ReadFile(formatPath) + if err != nil { + return err + } + err = json.Unmarshal(b, formatV2) + if err != nil { + return err + } + + if err = os.RemoveAll(pathJoin(export, minioMetaMultipartBucket)); err != nil { + return err + } + if err = os.MkdirAll(pathJoin(export, minioMetaMultipartBucket), 0755); err != nil { + return err + } + + // format-V2 struct is exactly same as format-V1 except that version is "3" + // which indicates the simplified multipart backend. + formatV3 := formatXLV3{} + + formatV3.Version = formatV2.Version + formatV3.Format = formatV2.Format + formatV3.XL = formatV2.XL + + formatV3.XL.Version = formatXLVersionV3 + + b, err = json.Marshal(formatV3) + if err != nil { + return err + } + return ioutil.WriteFile(formatPath, b, 0644) +} + // Returns true, if one of the errors is non-nil. func hasAnyErrors(errs []error) bool { for _, err := range errs { @@ -236,7 +314,7 @@ func shouldInitXLDisks(errs []error) bool { } // loadFormatXLAll - load all format config from all input disks in parallel. -func loadFormatXLAll(endpoints EndpointList) ([]*formatXLV2, []error) { +func loadFormatXLAll(endpoints EndpointList) ([]*formatXLV3, []error) { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -253,7 +331,7 @@ func loadFormatXLAll(endpoints EndpointList) ([]*formatXLV2, []error) { var sErrs = make([]error, len(bootstrapDisks)) // Initialize format configs. - var formats = make([]*formatXLV2, len(bootstrapDisks)) + var formats = make([]*formatXLV3, len(bootstrapDisks)) // Load format from each disk in parallel for index, disk := range bootstrapDisks { @@ -303,7 +381,7 @@ func undoSaveFormatXLAll(disks []StorageAPI) { wg.Wait() } -func saveFormatXL(disk StorageAPI, format *formatXLV2) error { +func saveFormatXL(disk StorageAPI, format interface{}) error { // Marshal and write to disk. formatBytes, err := json.Marshal(format) if err != nil { @@ -323,7 +401,7 @@ func saveFormatXL(disk StorageAPI, format *formatXLV2) error { } // loadFormatXL - loads format.json from disk. -func loadFormatXL(disk StorageAPI) (format *formatXLV2, err error) { +func loadFormatXL(disk StorageAPI) (format *formatXLV3, err error) { buf, err := disk.ReadAll(minioMetaBucket, formatConfigFile) if err != nil { // 'file not found' and 'volume not found' as @@ -348,7 +426,7 @@ func loadFormatXL(disk StorageAPI) (format *formatXLV2, err error) { } // Try to decode format json into formatConfigV1 struct. - format = &formatXLV2{} + format = &formatXLV3{} if err = json.Unmarshal(buf, format); err != nil { return nil, err } @@ -358,7 +436,7 @@ func loadFormatXL(disk StorageAPI) (format *formatXLV2, err error) { } // Valid formatXL basic versions. -func checkFormatXLValue(formatXL *formatXLV2) error { +func checkFormatXLValue(formatXL *formatXLV3) error { // Validate format version and format type. if formatXL.Version != formatMetaVersionV1 { return fmt.Errorf("Unsupported version of backend format [%s] found", formatXL.Version) @@ -366,14 +444,14 @@ func checkFormatXLValue(formatXL *formatXLV2) error { if formatXL.Format != formatBackendXL { return fmt.Errorf("Unsupported backend format [%s] found", formatXL.Format) } - if formatXL.XL.Version != formatXLVersionV2 { + if formatXL.XL.Version != formatXLVersionV3 { return fmt.Errorf("Unsupported XL backend format found [%s]", formatXL.XL.Version) } return nil } // Check all format values. -func checkFormatXLValues(formats []*formatXLV2) error { +func checkFormatXLValues(formats []*formatXLV3) error { for i, formatXL := range formats { if formatXL == nil { continue @@ -390,7 +468,7 @@ func checkFormatXLValues(formats []*formatXLV2) error { } // Get backend XL format in quorum `format.json`. -func getFormatXLInQuorum(formats []*formatXLV2) (*formatXLV2, error) { +func getFormatXLInQuorum(formats []*formatXLV3) (*formatXLV3, error) { formatHashes := make([]string, len(formats)) for i, format := range formats { if format == nil { @@ -437,7 +515,7 @@ func getFormatXLInQuorum(formats []*formatXLV2) (*formatXLV2, error) { return nil, errXLReadQuorum } -func formatXLV2Check(reference *formatXLV2, format *formatXLV2) error { +func formatXLV3Check(reference *formatXLV3, format *formatXLV3) error { tmpFormat := *format this := tmpFormat.XL.This tmpFormat.XL.This = "" @@ -471,7 +549,7 @@ func formatXLV2Check(reference *formatXLV2, format *formatXLV2) error { } // saveFormatXLAll - populates `format.json` on disks in its order. -func saveFormatXLAll(endpoints EndpointList, formats []*formatXLV2) error { +func saveFormatXLAll(endpoints EndpointList, formats []*formatXLV3) error { storageDisks, err := initStorageDisks(endpoints) if err != nil { return err @@ -488,7 +566,7 @@ func saveFormatXLAll(endpoints EndpointList, formats []*formatXLV2) error { continue } wg.Add(1) - go func(index int, disk StorageAPI, format *formatXLV2) { + go func(index int, disk StorageAPI, format *formatXLV3) { defer wg.Done() errs[index] = saveFormatXL(disk, format) }(index, disk, formats[index]) @@ -525,9 +603,9 @@ func initStorageDisks(endpoints EndpointList) ([]StorageAPI, error) { } // initFormatXL - save XL format configuration on all disks. -func initFormatXL(endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV2, err error) { - format = newFormatXLV2(setCount, disksPerSet) - formats := make([]*formatXLV2, len(endpoints)) +func initFormatXL(endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) { + format = newFormatXLV3(setCount, disksPerSet) + formats := make([]*formatXLV3, len(endpoints)) for i := 0; i < setCount; i++ { for j := 0; j < disksPerSet; j++ { @@ -574,7 +652,7 @@ func makeFormatXLMetaVolumes(disk StorageAPI) error { var initMetaVolIgnoredErrs = append(baseIgnoredErrs, errVolumeExists) // Initializes meta volume on all input storage disks. -func initFormatXLMetaVolume(endpoints EndpointList, formats []*formatXLV2) error { +func initFormatXLMetaVolume(endpoints EndpointList, formats []*formatXLV3) error { storageDisks, err := initStorageDisks(endpoints) if err != nil { return err @@ -621,7 +699,7 @@ func initFormatXLMetaVolume(endpoints EndpointList, formats []*formatXLV2) error // Get all UUIDs which are present in reference format should // be present in the list of formats provided, those are considered // as online UUIDs. -func getOnlineUUIDs(refFormat *formatXLV2, formats []*formatXLV2) (onlineUUIDs []string) { +func getOnlineUUIDs(refFormat *formatXLV3, formats []*formatXLV3) (onlineUUIDs []string) { for _, format := range formats { if format == nil { continue @@ -640,7 +718,7 @@ func getOnlineUUIDs(refFormat *formatXLV2, formats []*formatXLV2) (onlineUUIDs [ // Look for all UUIDs which are not present in reference format // but are present in the onlineUUIDs list, construct of list such // offline UUIDs. -func getOfflineUUIDs(refFormat *formatXLV2, formats []*formatXLV2) (offlineUUIDs []string) { +func getOfflineUUIDs(refFormat *formatXLV3, formats []*formatXLV3) (offlineUUIDs []string) { onlineUUIDs := getOnlineUUIDs(refFormat, formats) for i, set := range refFormat.XL.Sets { for j, uuid := range set { @@ -659,7 +737,7 @@ func getOfflineUUIDs(refFormat *formatXLV2, formats []*formatXLV2) (offlineUUIDs } // Mark all UUIDs that are offline. -func markUUIDsOffline(refFormat *formatXLV2, formats []*formatXLV2) { +func markUUIDsOffline(refFormat *formatXLV3, formats []*formatXLV3) { offlineUUIDs := getOfflineUUIDs(refFormat, formats) for i, set := range refFormat.XL.Sets { for j := range set { @@ -673,15 +751,15 @@ func markUUIDsOffline(refFormat *formatXLV2, formats []*formatXLV2) { } // Initialize a new set of set formats which will be written to all disks. -func newHealFormatSets(refFormat *formatXLV2, setCount, disksPerSet int, formats []*formatXLV2, errs []error) [][]*formatXLV2 { - newFormats := make([][]*formatXLV2, setCount) +func newHealFormatSets(refFormat *formatXLV3, setCount, disksPerSet int, formats []*formatXLV3, errs []error) [][]*formatXLV3 { + newFormats := make([][]*formatXLV3, setCount) for i := range refFormat.XL.Sets { - newFormats[i] = make([]*formatXLV2, disksPerSet) + newFormats[i] = make([]*formatXLV3, disksPerSet) } for i := range refFormat.XL.Sets { for j := range refFormat.XL.Sets[i] { if errs[i*disksPerSet+j] == errUnformattedDisk || errs[i*disksPerSet+j] == nil { - newFormats[i][j] = &formatXLV2{} + newFormats[i][j] = &formatXLV3{} newFormats[i][j].Version = refFormat.Version newFormats[i][j].Format = refFormat.Format newFormats[i][j].XL.Version = refFormat.XL.Version diff --git a/cmd/format-xl_test.go b/cmd/format-xl_test.go index b2a1272ba..f8978ce9e 100644 --- a/cmd/format-xl_test.go +++ b/cmd/format-xl_test.go @@ -18,6 +18,7 @@ package cmd import ( "encoding/json" + "fmt" "io/ioutil" "os" "testing" @@ -25,8 +26,8 @@ import ( // Test get offline/online uuids. func TestGetUUIDs(t *testing.T) { - fmtV2 := newFormatXLV2(4, 16) - formats := make([]*formatXLV2, 64) + fmtV2 := newFormatXLV3(4, 16) + formats := make([]*formatXLV3, 64) for i := 0; i < 4; i++ { for j := 0; j < 16; j++ { @@ -195,6 +196,14 @@ func TestFormatXLMigrate(t *testing.T) { t.Fatal(err) } + migratedVersion, err := formatXLGetVersion(pathJoin(rootPath, minioMetaBucket, formatConfigFile)) + if err != nil { + t.Fatal(err) + } + if migratedVersion != formatXLVersionV3 { + t.Fatal(fmt.Sprintf("expected version: %s, got: %s", formatXLVersionV3, migratedVersion)) + } + m = &formatXLV1{} m.Format = "unknown" m.Version = formatMetaVersionV1 @@ -218,7 +227,7 @@ func TestFormatXLMigrate(t *testing.T) { m = &formatXLV1{} m.Format = formatBackendXL m.Version = formatMetaVersionV1 - m.XL.Version = "3" + m.XL.Version = "30" m.XL.Disk = mustGetUUID() m.XL.JBOD = []string{m.XL.Disk, mustGetUUID(), mustGetUUID(), mustGetUUID()} @@ -239,12 +248,12 @@ func TestFormatXLMigrate(t *testing.T) { // Tests check format xl value. func TestCheckFormatXLValue(t *testing.T) { testCases := []struct { - format *formatXLV2 + format *formatXLV3 success bool }{ // Invalid XL format version "2". { - &formatXLV2{ + &formatXLV3{ Version: "2", Format: "XL", XL: struct { @@ -260,7 +269,7 @@ func TestCheckFormatXLValue(t *testing.T) { }, // Invalid XL format "Unknown". { - &formatXLV2{ + &formatXLV3{ Version: "1", Format: "Unknown", XL: struct { @@ -276,7 +285,7 @@ func TestCheckFormatXLValue(t *testing.T) { }, // Invalid XL format version "0". { - &formatXLV2{ + &formatXLV3{ Version: "1", Format: "XL", XL: struct { @@ -305,8 +314,8 @@ func TestGetFormatXLInQuorumCheck(t *testing.T) { setCount := 2 disksPerSet := 16 - format := newFormatXLV2(setCount, disksPerSet) - formats := make([]*formatXLV2, 32) + format := newFormatXLV3(setCount, disksPerSet) + formats := make([]*formatXLV3, 32) for i := 0; i < setCount; i++ { for j := 0; j < disksPerSet; j++ { @@ -323,12 +332,12 @@ func TestGetFormatXLInQuorumCheck(t *testing.T) { } // Check if the reference format and input formats are same. - if err = formatXLV2Check(quorumFormat, formats[0]); err != nil { + if err = formatXLV3Check(quorumFormat, formats[0]); err != nil { t.Fatal(err) } // QuorumFormat has .This field empty on purpose, expect a failure. - if err = formatXLV2Check(formats[0], quorumFormat); err == nil { + if err = formatXLV3Check(formats[0], quorumFormat); err == nil { t.Fatal("Unexpected success") } @@ -340,19 +349,19 @@ func TestGetFormatXLInQuorumCheck(t *testing.T) { badFormat := *quorumFormat badFormat.XL.Sets = nil - if err = formatXLV2Check(quorumFormat, &badFormat); err == nil { + if err = formatXLV3Check(quorumFormat, &badFormat); err == nil { t.Fatal("Unexpected success") } badFormatUUID := *quorumFormat badFormatUUID.XL.Sets[0][0] = "bad-uuid" - if err = formatXLV2Check(quorumFormat, &badFormatUUID); err == nil { + if err = formatXLV3Check(quorumFormat, &badFormatUUID); err == nil { t.Fatal("Unexpected success") } badFormatSetSize := *quorumFormat badFormatSetSize.XL.Sets[0] = nil - if err = formatXLV2Check(quorumFormat, &badFormatSetSize); err == nil { + if err = formatXLV3Check(quorumFormat, &badFormatSetSize); err == nil { t.Fatal("Unexpected success") } @@ -371,8 +380,8 @@ func TestNewFormatSets(t *testing.T) { setCount := 2 disksPerSet := 16 - format := newFormatXLV2(setCount, disksPerSet) - formats := make([]*formatXLV2, 32) + format := newFormatXLV3(setCount, disksPerSet) + formats := make([]*formatXLV3, 32) errs := make([]error, 32) for i := 0; i < setCount; i++ { diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 2248fcd40..3e36b9f48 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -174,7 +174,7 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { return nil, fmt.Errorf("Unable to initialize event notification. %s", err) } - go fs.cleanupStaleMultipartUploads(multipartCleanupInterval, multipartExpiry, globalServiceDoneCh) + go fs.cleanupStaleMultipartUploads(globalMultipartCleanupInterval, globalMultipartExpiry, globalServiceDoneCh) // Return successfully initialized object layer. return fs, nil diff --git a/cmd/globals.go b/cmd/globals.go index 23dfd2f07..5cb8c90ed 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -71,6 +71,11 @@ const ( // Default Read/Write timeouts for each connection. globalConnReadTimeout = 15 * time.Minute // Timeout after 15 minutes of no data sent by the client. globalConnWriteTimeout = 15 * time.Minute // Timeout after 15 minutes if no data received by the client. + + // Expiry duration after which the multipart uploads are deemed stale. + globalMultipartExpiry = time.Hour * 24 * 14 // 2 weeks. + // Cleanup interval when the stale multipart cleanup is initiated. + globalMultipartCleanupInterval = time.Hour * 24 // 24 hrs. ) var ( @@ -167,9 +172,6 @@ var ( // Set to store standard storage class globalStandardStorageClass storageClass - // Current RPC version - globalRPCAPIVersion = semVersion{2, 0, 0} - // Add new variable global values here. ) diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index f120b4e7d..f0e19ee0b 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -49,12 +49,6 @@ func init() { globalObjLayerMutex = &sync.RWMutex{} } -// Check if the disk is remote. -func isRemoteDisk(disk StorageAPI) bool { - _, ok := disk.(*networkStorage) - return ok -} - // Checks if the object is a directory, this logic uses // if size == 0 and object ends with slashSeparator then // returns true. @@ -96,52 +90,6 @@ func deleteBucketMetadata(bucket string, objAPI ObjectLayer) { _ = removeListenerConfig(objAPI, bucket) } -// House keeping code for FS/XL and distributed Minio setup. -func houseKeeping(storageDisks []StorageAPI) error { - var wg = &sync.WaitGroup{} - - // Initialize errs to collect errors inside go-routine. - var errs = make([]error, len(storageDisks)) - - // Initialize all disks in parallel. - for index, disk := range storageDisks { - if disk == nil { - continue - } - // Skip remote disks. - if isRemoteDisk(disk) { - continue - } - wg.Add(1) - go func(index int, disk StorageAPI) { - // Indicate this wait group is done. - defer wg.Done() - - // Cleanup all temp entries upon start. - err := cleanupDir(disk, minioMetaTmpBucket, "") - if err != nil { - if !errors.IsErrIgnored(errors.Cause(err), errDiskNotFound, errVolumeNotFound, errFileNotFound) { - errs[index] = err - } - } - }(index, disk) - } - - // Wait for all cleanup to finish. - wg.Wait() - - // Return upon first error. - for _, err := range errs { - if err == nil { - continue - } - return toObjectErr(err, minioMetaTmpBucket, "*") - } - - // Return success here. - return nil -} - // Depending on the disk type network or local, initialize storage API. func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) { if endpoint.IsLocal { diff --git a/cmd/object-api-common_test.go b/cmd/object-api-common_test.go deleted file mode 100644 index 33b64d281..000000000 --- a/cmd/object-api-common_test.go +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "sync" - "testing" - - "github.com/minio/minio/pkg/errors" -) - -func TestHouseKeeping(t *testing.T) { - fsDirs, err := getRandomDisks(8) - if err != nil { - t.Fatalf("Failed to create disks for storage layer %v", err) - } - defer removeRoots(fsDirs) - - noSpaceDirs, err := getRandomDisks(8) - if err != nil { - t.Fatalf("Failed to create disks for storage layer %v", err) - } - defer removeRoots(noSpaceDirs) - - properStorage := []StorageAPI{} - for _, fsDir := range fsDirs { - var sd StorageAPI - sd, err = newPosix(fsDir) - if err != nil { - t.Fatalf("Failed to create a local disk-based storage layer %v", err) - } - properStorage = append(properStorage, sd) - } - - noSpaceBackend := []StorageAPI{} - for _, noSpaceDir := range noSpaceDirs { - sd, err := newPosix(noSpaceDir) - if err != nil { - t.Fatalf("Failed to create a local disk-based storage layer %v", err) - } - noSpaceBackend = append(noSpaceBackend, sd) - } - noSpaceStorage := prepareNErroredDisks(noSpaceBackend, 5, errDiskFull, t) - - // Create .minio.sys/tmp directory on all disks. - wg := sync.WaitGroup{} - errs := make([]error, len(properStorage)) - for i, store := range properStorage { - wg.Add(1) - go func(index int, store StorageAPI) { - defer wg.Done() - errs[index] = store.MakeVol(minioMetaBucket) - if errs[index] != nil { - return - } - errs[index] = store.MakeVol(minioMetaTmpBucket) - if errs[index] != nil { - return - } - errs[index] = store.AppendFile(minioMetaTmpBucket, "hello.txt", []byte("hello")) - }(i, store) - } - wg.Wait() - for i := range errs { - if errs[i] != nil { - t.Fatalf("Failed to create .minio.sys/tmp directory on disk %v %v", - properStorage[i], errs[i]) - } - } - - nilDiskStorage := []StorageAPI{nil, nil, nil, nil, nil, nil, nil, nil} - testCases := []struct { - store []StorageAPI - expectedErr error - }{ - {properStorage, nil}, - {noSpaceStorage, StorageFull{}}, - {nilDiskStorage, nil}, - } - for i, test := range testCases { - actualErr := errors.Cause(houseKeeping(test.store)) - if actualErr != test.expectedErr { - t.Errorf("Test %d - actual error is %#v, expected error was %#v", - i+1, actualErr, test.expectedErr) - } - } -} diff --git a/cmd/object-api-multipart-common.go b/cmd/object-api-multipart-common.go deleted file mode 100644 index fe6768200..000000000 --- a/cmd/object-api-multipart-common.go +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "encoding/json" - "io" - "io/ioutil" - "path" - "sort" - "time" - - "github.com/minio/minio/pkg/errors" - "github.com/minio/minio/pkg/lock" -) - -const ( - // Expiry duration after which the multipart uploads are deemed stale. - multipartExpiry = time.Hour * 24 * 14 // 2 weeks. - // Cleanup interval when the stale multipart cleanup is initiated. - multipartCleanupInterval = time.Hour * 24 // 24 hrs. -) - -// A uploadInfo represents the s3 compatible spec. -type uploadInfo struct { - UploadID string `json:"uploadId"` // UploadID for the active multipart upload. - Deleted bool `json:"deleted"` // Currently unused, for future use. - Initiated time.Time `json:"initiated"` // Indicates when the uploadID was initiated. -} - -// A uploadsV1 represents `uploads.json` metadata header. -type uploadsV1 struct { - Version string `json:"version"` // Version of the current `uploads.json` - Format string `json:"format"` // Format of the current `uploads.json` - Uploads []uploadInfo `json:"uploadIds"` // Captures all the upload ids for a given object. -} - -// byInitiatedTime is a collection satisfying sort.Interface. -type byInitiatedTime []uploadInfo - -func (t byInitiatedTime) Len() int { return len(t) } -func (t byInitiatedTime) Swap(i, j int) { t[i], t[j] = t[j], t[i] } -func (t byInitiatedTime) Less(i, j int) bool { - return t[i].Initiated.Before(t[j].Initiated) -} - -// AddUploadID - adds a new upload id in order of its initiated time. -func (u *uploadsV1) AddUploadID(uploadID string, initiated time.Time) { - u.Uploads = append(u.Uploads, uploadInfo{ - UploadID: uploadID, - Initiated: initiated, - }) - sort.Sort(byInitiatedTime(u.Uploads)) -} - -// RemoveUploadID - removes upload id from uploads metadata. -func (u *uploadsV1) RemoveUploadID(uploadID string) { - // If the uploadID is absent, we do nothing. - for i, uInfo := range u.Uploads { - if uInfo.UploadID == uploadID { - u.Uploads = append(u.Uploads[:i], u.Uploads[i+1:]...) - break - } - } -} - -// IsEmpty - is true if no more uploads available. -func (u *uploadsV1) IsEmpty() bool { - return len(u.Uploads) == 0 -} - -func (u *uploadsV1) WriteTo(lk *lock.LockedFile) (n int64, err error) { - if err = jsonSave(lk, u); err != nil { - return 0, err - } - fi, err := lk.Stat() - if err != nil { - return 0, err - } - return fi.Size(), nil -} - -func (u *uploadsV1) ReadFrom(lk *lock.LockedFile) (n int64, err error) { - var uploadIDBytes []byte - fi, err := lk.Stat() - if err != nil { - return 0, errors.Trace(err) - } - uploadIDBytes, err = ioutil.ReadAll(io.NewSectionReader(lk, 0, fi.Size())) - if err != nil { - return 0, errors.Trace(err) - } - if len(uploadIDBytes) == 0 { - return 0, errors.Trace(io.EOF) - } - // Decode `uploads.json`. - if err = json.Unmarshal(uploadIDBytes, u); err != nil { - return 0, errors.Trace(err) - } - return int64(len(uploadIDBytes)), nil -} - -// readUploadsJSON - get all the saved uploads JSON. -func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV1, err error) { - uploadJSONPath := path.Join(bucket, object, uploadsJSONFile) - // Reads entire `uploads.json`. - buf, err := disk.ReadAll(minioMetaMultipartBucket, uploadJSONPath) - if err != nil { - return uploadsV1{}, errors.Trace(err) - } - - // Decode `uploads.json`. - if err = json.Unmarshal(buf, &uploadIDs); err != nil { - return uploadsV1{}, errors.Trace(err) - } - - // Success. - return uploadIDs, nil -} - -// newUploadsV1 - initialize new uploads v1. -func newUploadsV1(format string) uploadsV1 { - uploadIDs := uploadsV1{} - uploadIDs.Version = "1.0.0" // Should follow semantic versioning. - uploadIDs.Format = format - return uploadIDs -} - -func writeUploadJSON(u *uploadsV1, uploadsPath, tmpPath string, disk StorageAPI) error { - // Serialize to prepare to write to disk. - uplBytes, wErr := json.Marshal(&u) - if wErr != nil { - return errors.Trace(wErr) - } - - // Write `uploads.json` to disk. First to tmp location and then rename. - if wErr = disk.AppendFile(minioMetaTmpBucket, tmpPath, uplBytes); wErr != nil { - return errors.Trace(wErr) - } - wErr = disk.RenameFile(minioMetaTmpBucket, tmpPath, minioMetaMultipartBucket, uploadsPath) - if wErr != nil { - if dErr := disk.DeleteFile(minioMetaTmpBucket, tmpPath); dErr != nil { - // we return the most recent error. - return errors.Trace(dErr) - } - return errors.Trace(wErr) - } - return nil -} - -// listMultipartUploadIDs - list all the upload ids from a marker up to 'count'. -func (xl xlObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int, disk StorageAPI) ([]MultipartInfo, bool, error) { - var uploads []MultipartInfo - // Read `uploads.json`. - uploadsJSON, err := readUploadsJSON(bucketName, objectName, disk) - if err != nil { - switch errors.Cause(err) { - case errFileNotFound, errFileAccessDenied: - return nil, true, nil - } - return nil, false, err - } - index := 0 - if uploadIDMarker != "" { - for ; index < len(uploadsJSON.Uploads); index++ { - if uploadsJSON.Uploads[index].UploadID == uploadIDMarker { - // Skip the uploadID as it would already be listed in previous listing. - index++ - break - } - } - } - for index < len(uploadsJSON.Uploads) { - uploads = append(uploads, MultipartInfo{ - Object: objectName, - UploadID: uploadsJSON.Uploads[index].UploadID, - Initiated: uploadsJSON.Uploads[index].Initiated, - }) - count-- - index++ - if count == 0 { - break - } - } - end := (index == len(uploadsJSON.Uploads)) - return uploads, end, nil -} - -// List multipart uploads func defines the function signature of list multipart recursive function. -type listMultipartUploadsFunc func(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) - -// Removes multipart uploads if any older than `expiry` duration -// on all buckets for every `cleanupInterval`, this function is -// blocking and should be run in a go-routine. -func cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, obj ObjectLayer, listFn listMultipartUploadsFunc, doneCh chan struct{}) { - ticker := time.NewTicker(cleanupInterval) - for { - select { - case <-doneCh: - // Stop the timer. - ticker.Stop() - return - case <-ticker.C: - bucketInfos, err := obj.ListBuckets() - if err != nil { - errorIf(err, "Unable to list buckets") - continue - } - for _, bucketInfo := range bucketInfos { - cleanupStaleMultipartUpload(bucketInfo.Name, expiry, obj, listFn) - } - } - } -} - -// Removes multipart uploads if any older than `expiry` duration in a given bucket. -func cleanupStaleMultipartUpload(bucket string, expiry time.Duration, obj ObjectLayer, listFn listMultipartUploadsFunc) (err error) { - var lmi ListMultipartsInfo - for { - // List multipart uploads in a bucket 1000 at a time - prefix := "" - lmi, err = listFn(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000) - if err != nil { - errorIf(err, "Unable to list uploads") - return err - } - - // Remove uploads (and its parts) older than expiry duration. - for _, upload := range lmi.Uploads { - if time.Since(upload.Initiated) > expiry { - obj.AbortMultipartUpload(bucket, upload.Object, upload.UploadID) - } - } - - // No more incomplete uploads remain, break and return. - if !lmi.IsTruncated { - break - } - } - - return nil -} diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 315cf3d53..5011ad892 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -45,6 +45,7 @@ var printEndpointError = func() func(Endpoint, error) { } }() +// Migrates backend format of local disks. func formatXLMigrateLocalEndpoints(endpoints EndpointList) error { for _, endpoint := range endpoints { if !endpoint.IsLocal { @@ -64,8 +65,31 @@ func formatXLMigrateLocalEndpoints(endpoints EndpointList) error { return nil } +// Cleans up tmp directory of local disks. +func formatXLCleanupTmpLocalEndpoints(endpoints EndpointList) error { + for _, endpoint := range endpoints { + if !endpoint.IsLocal { + continue + } + formatPath := pathJoin(endpoint.Path, minioMetaBucket, formatConfigFile) + if _, err := os.Stat(formatPath); err != nil { + if os.IsNotExist(err) { + continue + } + return err + } + if err := os.RemoveAll(pathJoin(endpoint.Path, minioMetaTmpBucket)); err != nil { + return err + } + if err := os.MkdirAll(pathJoin(endpoint.Path, minioMetaTmpBucket), 0777); err != nil { + return err + } + } + return nil +} + // Format disks before initialization of object layer. -func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV2, err error) { +func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) { if len(endpoints) == 0 || setCount == 0 || disksPerSet == 0 { return nil, errInvalidArgument } @@ -74,6 +98,10 @@ func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerS return nil, err } + if err = formatXLCleanupTmpLocalEndpoints(endpoints); err != nil { + return nil, err + } + // Done channel is used to close any lingering retry routine, as soon // as this function returns. doneCh := make(chan struct{}) @@ -125,7 +153,7 @@ func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerS if formatConfigs[i] == nil { continue } - if err = formatXLV2Check(format, formatConfigs[i]); err != nil { + if err = formatXLV3Check(format, formatConfigs[i]); err != nil { return nil, fmt.Errorf("%s format error: %s", endpoints[i], err) } } diff --git a/cmd/rpc-common.go b/cmd/rpc-common.go index b33d0999d..ea790bd76 100644 --- a/cmd/rpc-common.go +++ b/cmd/rpc-common.go @@ -27,6 +27,12 @@ import ( // 3 seconds is chosen arbitrarily. const rpcSkewTimeAllowed = 3 * time.Second +// RPC V1 - Initial version +// RPC V2 - format.json XL version changed to 2 +// RPC V3 - format.json XL version changed to 3 +// Current RPC version +var globalRPCAPIVersion = semVersion{3, 0, 0} + func isRequestTimeAllowed(requestTime time.Time) bool { // Check whether request time is within acceptable skew time. utcNow := UTCNow() diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index ef2cf2e5f..f075a58b0 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -44,7 +44,7 @@ type xlSets struct { formatMu sync.RWMutex // Reference format. - format *formatXLV2 + format *formatXLV3 // xlDisks mutex to lock xlDisks. xlDisksMu sync.RWMutex @@ -92,7 +92,7 @@ func (s *xlSets) isConnected(endpoint Endpoint) bool { // Initializes a new StorageAPI from the endpoint argument, returns // StorageAPI and also `format` which exists on the disk. -func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatXLV2, error) { +func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatXLV3, error) { disk, err := newStorageAPI(endpoint) if err != nil { return nil, nil, err @@ -110,8 +110,8 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatXLV2, error) { // findDiskIndex - returns the i,j'th position of the input `format` against the reference // format, after successful validation. -func findDiskIndex(refFormat, format *formatXLV2) (int, int, error) { - if err := formatXLV2Check(refFormat, format); err != nil { +func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) { + if err := formatXLV3Check(refFormat, format); err != nil { return 0, 0, err } @@ -180,7 +180,7 @@ func (s *xlSets) GetDisks(setIndex int) func() []StorageAPI { const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs. // Initialize new set of erasure coded sets. -func newXLSets(endpoints EndpointList, format *formatXLV2, setCount int, drivesPerSet int) (ObjectLayer, error) { +func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesPerSet int) (ObjectLayer, error) { // Initialize the XL sets instance. s := &xlSets{ @@ -205,6 +205,7 @@ func newXLSets(endpoints EndpointList, format *formatXLV2, setCount int, drivesP nsMutex: mutex, bp: bpool.NewBytePoolCap(setCount*drivesPerSet, blockSizeV1, blockSizeV1*2), } + go s.sets[i].cleanupStaleMultipartUploads(globalMultipartCleanupInterval, globalMultipartExpiry, globalServiceDoneCh) } for _, endpoint := range endpoints { @@ -872,7 +873,7 @@ else fi */ -func formatsToDrivesInfo(endpoints EndpointList, formats []*formatXLV2, sErrs []error) (beforeDrives []madmin.DriveInfo) { +func formatsToDrivesInfo(endpoints EndpointList, formats []*formatXLV3, sErrs []error) (beforeDrives []madmin.DriveInfo) { // Existing formats are available (i.e. ok), so save it in // result, also populate disks to be healed. for i, format := range formats { @@ -1009,7 +1010,7 @@ func (s *xlSets) HealFormat(dryRun bool) (madmin.HealResultItem, error) { } if !dryRun { - var tmpNewFormats = make([]*formatXLV2, s.setCount*s.drivesPerSet) + var tmpNewFormats = make([]*formatXLV3, s.setCount*s.drivesPerSet) for i := range newFormatSets { for j := range newFormatSets[i] { if newFormatSets[i][j] == nil { diff --git a/cmd/xl-v1-metadata_test.go b/cmd/xl-v1-metadata_test.go index 12d1e1604..cd95f7176 100644 --- a/cmd/xl-v1-metadata_test.go +++ b/cmd/xl-v1-metadata_test.go @@ -157,7 +157,7 @@ func testXLReadMetaParts(obj ObjectLayer, instanceType string, disks []string, t } } - uploadIDPath := path.Join(bucketNames[0], objectNames[0], uploadIDs[0]) + uploadIDPath := obj.(*xlObjects).getUploadIDDir(bucketNames[0], objectNames[0], uploadIDs[0]) _, _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) if err != nil { @@ -175,7 +175,7 @@ func testXLReadMetaParts(obj ObjectLayer, instanceType string, disks []string, t for _, disk := range disks { os.RemoveAll(path.Join(disk, bucketNames[0])) - os.RemoveAll(path.Join(disk, minioMetaMultipartBucket, bucketNames[0])) + os.RemoveAll(path.Join(disk, minioMetaMultipartBucket, obj.(*xlObjects).getMultipartSHADir(bucketNames[0], objectNames[0]))) } _, _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 1f87df46e..297f800fc 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -20,6 +20,7 @@ import ( "encoding/hex" "fmt" "path" + "sort" "strings" "sync" "time" @@ -29,158 +30,17 @@ import ( "github.com/minio/minio/pkg/mimedb" ) -// updateUploadJSON - add or remove upload ID info in all `uploads.json`. -func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated time.Time, writeQuorum int, isRemove bool) error { - uploadsPath := path.Join(bucket, object, uploadsJSONFile) - tmpUploadsPath := mustGetUUID() - - // slice to store errors from disks - errs := make([]error, len(xl.getDisks())) - // slice to store if it is a delete operation on a disk - isDelete := make([]bool, len(xl.getDisks())) - - wg := sync.WaitGroup{} - for index, disk := range xl.getDisks() { - if disk == nil { - errs[index] = errors.Trace(errDiskNotFound) - continue - } - // Update `uploads.json` in a go routine. - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - - // read and parse uploads.json on this disk - uploadsJSON, err := readUploadsJSON(bucket, object, disk) - if errors.Cause(err) == errFileNotFound { - // If file is not found, we assume an - // default (empty) upload info. - uploadsJSON, err = newUploadsV1("xl"), nil - } - // If we have a read error, we store error and - // exit. - if err != nil { - errs[index] = err - return - } - - if !isRemove { - // Add the uploadID - uploadsJSON.AddUploadID(uploadID, initiated) - } else { - // Remove the upload ID - uploadsJSON.RemoveUploadID(uploadID) - if len(uploadsJSON.Uploads) == 0 { - isDelete[index] = true - } - } - - // For delete, rename to tmp, for the - // possibility of recovery in case of quorum - // failure. - if !isDelete[index] { - errs[index] = writeUploadJSON(&uploadsJSON, uploadsPath, tmpUploadsPath, disk) - } else { - wErr := disk.RenameFile(minioMetaMultipartBucket, uploadsPath, minioMetaTmpBucket, tmpUploadsPath) - if wErr != nil { - errs[index] = errors.Trace(wErr) - } - - } - }(index, disk) - } - - // Wait for all the writes to finish. - wg.Wait() - - err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum) - if errors.Cause(err) == errXLWriteQuorum { - // No quorum. Perform cleanup on the minority of disks - // on which the operation succeeded. - - // There are two cases: - // - // 1. uploads.json file was updated -> we delete the - // file that we successfully overwrote on the - // minority of disks, so that the failed quorum - // operation is not partially visible. - // - // 2. uploads.json was deleted -> in this case since - // the delete failed, we restore from tmp. - for index, disk := range xl.getDisks() { - if disk == nil || errs[index] != nil { - continue - } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - if !isDelete[index] { - _ = disk.DeleteFile( - minioMetaMultipartBucket, - uploadsPath, - ) - } else { - _ = disk.RenameFile( - minioMetaTmpBucket, tmpUploadsPath, - minioMetaMultipartBucket, uploadsPath, - ) - } - }(index, disk) - } - wg.Wait() - return err - } - - // we do have quorum, so in case of delete upload.json file - // operation, we purge from tmp. - for index, disk := range xl.getDisks() { - if disk == nil || !isDelete[index] { - continue - } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - // isDelete[index] = true at this point. - _ = disk.DeleteFile(minioMetaTmpBucket, tmpUploadsPath) - }(index, disk) - } - wg.Wait() - return err -} - -// addUploadID - add upload ID and its initiated time to 'uploads.json'. -func (xl xlObjects) addUploadID(bucket, object string, uploadID string, initiated time.Time, writeQuorum int) error { - return xl.updateUploadJSON(bucket, object, uploadID, initiated, writeQuorum, false) +func (xl xlObjects) getUploadIDDir(bucket, object, uploadID string) string { + return pathJoin(xl.getMultipartSHADir(bucket, object), uploadID) } -// removeUploadID - remove upload ID in 'uploads.json'. -func (xl xlObjects) removeUploadID(bucket, object string, uploadID string, writeQuorum int) error { - return xl.updateUploadJSON(bucket, object, uploadID, time.Time{}, writeQuorum, true) -} - -// Returns if the prefix is a multipart upload. -func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { - for _, disk := range xl.getLoadBalancedDisks() { - if disk == nil { - continue - } - _, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile)) - if err == nil { - return true - } - // For any reason disk was deleted or goes offline, continue - if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) { - continue - } - break - } - return false +func (xl xlObjects) getMultipartSHADir(bucket, object string) string { + return getSHA256Hash([]byte(pathJoin(bucket, object))) } // isUploadIDExists - verify if a given uploadID exists and is valid. func (xl xlObjects) isUploadIDExists(bucket, object, uploadID string) bool { - uploadIDPath := path.Join(bucket, object, uploadID) - return xl.isObject(minioMetaMultipartBucket, uploadIDPath) + return xl.isObject(minioMetaMultipartBucket, xl.getUploadIDDir(bucket, object, uploadID)) } // Removes part given by partName belonging to a mulitpart upload from minioMetaBucket @@ -206,7 +66,7 @@ func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) // statPart - returns fileInfo structure for a successful stat on part file. func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInfo FileInfo, err error) { var ignoredErrs []error - partNamePath := path.Join(bucket, object, uploadID, partName) + partNamePath := path.Join(xl.getUploadIDDir(bucket, object, uploadID), partName) for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { ignoredErrs = append(ignoredErrs, errDiskNotFound) @@ -271,174 +131,6 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr return evalDisks(disks, mErrs), err } -// listMultipartUploadsCleanup - lists all multipart uploads. Called by xl.cleanupStaleMultipartUpload() -func (xl xlObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { - result := ListMultipartsInfo{ - IsTruncated: true, - MaxUploads: maxUploads, - KeyMarker: keyMarker, - Prefix: prefix, - Delimiter: delimiter, - } - - recursive := true - if delimiter == slashSeparator { - recursive = false - } - - // Not using path.Join() as it strips off the trailing '/'. - multipartPrefixPath := pathJoin(bucket, prefix) - if prefix == "" { - // Should have a trailing "/" if prefix is "" - // For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is "" - multipartPrefixPath += slashSeparator - } - multipartMarkerPath := "" - if keyMarker != "" { - multipartMarkerPath = pathJoin(bucket, keyMarker) - } - var uploads []MultipartInfo - var err error - var eof bool - // List all upload ids for the keyMarker starting from - // uploadIDMarker first. - if uploadIDMarker != "" { - // hold lock on keyMarker path - keyMarkerLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, keyMarker)) - if err = keyMarkerLock.GetRLock(globalListingTimeout); err != nil { - return lmi, err - } - for _, disk := range xl.getLoadBalancedDisks() { - if disk == nil { - continue - } - uploads, _, err = xl.listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk) - if err == nil { - break - } - if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) { - continue - } - break - } - keyMarkerLock.RUnlock() - if err != nil { - return lmi, err - } - maxUploads = maxUploads - len(uploads) - } - var walkerCh chan treeWalkResult - var walkerDoneCh chan struct{} - heal := false // true only for xl.ListObjectsHeal - // Validate if we need to list further depending on maxUploads. - if maxUploads > 0 { - walkerCh, walkerDoneCh = xl.listPool.Release(listParams{minioMetaMultipartBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal}) - if walkerCh == nil { - walkerDoneCh = make(chan struct{}) - isLeaf := xl.isMultipartUpload - listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...) - walkerCh = startTreeWalk(minioMetaMultipartBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh) - } - // Collect uploads until we have reached maxUploads count to 0. - for maxUploads > 0 { - walkResult, ok := <-walkerCh - if !ok { - // Closed channel. - eof = true - break - } - // For any walk error return right away. - if walkResult.err != nil { - return lmi, walkResult.err - } - entry := strings.TrimPrefix(walkResult.entry, retainSlash(bucket)) - // For an entry looking like a directory, store and - // continue the loop not need to fetch uploads. - if hasSuffix(walkResult.entry, slashSeparator) { - uploads = append(uploads, MultipartInfo{ - Object: entry, - }) - maxUploads-- - if maxUploads == 0 { - eof = true - break - } - continue - } - var newUploads []MultipartInfo - var end bool - uploadIDMarker = "" - - // For the new object entry we get all its - // pending uploadIDs. - entryLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, entry)) - if err = entryLock.GetRLock(globalListingTimeout); err != nil { - return lmi, err - } - var disk StorageAPI - for _, disk = range xl.getLoadBalancedDisks() { - if disk == nil { - continue - } - newUploads, end, err = xl.listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk) - if err == nil { - break - } - if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) { - continue - } - break - } - entryLock.RUnlock() - if err != nil { - if errors.IsErrIgnored(err, xlTreeWalkIgnoredErrs...) { - continue - } - return lmi, err - } - uploads = append(uploads, newUploads...) - maxUploads -= len(newUploads) - if end && walkResult.end { - eof = true - break - } - } - } - // For all received uploads fill in the multiparts result. - for _, upload := range uploads { - var objectName string - var uploadID string - if hasSuffix(upload.Object, slashSeparator) { - // All directory entries are common prefixes. - uploadID = "" // For common prefixes, upload ids are empty. - objectName = upload.Object - result.CommonPrefixes = append(result.CommonPrefixes, objectName) - } else { - uploadID = upload.UploadID - objectName = upload.Object - result.Uploads = append(result.Uploads, upload) - } - result.NextKeyMarker = objectName - result.NextUploadIDMarker = uploadID - } - - if !eof { - // Save the go-routine state in the pool so that it can continue from where it left off on - // the next request. - xl.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkerCh, walkerDoneCh) - } - - result.IsTruncated = !eof - // Result is not truncated, reset the markers. - if !result.IsTruncated { - result.NextKeyMarker = "" - result.NextUploadIDMarker = "" - } - return result, nil -} - // ListMultipartUploads - lists all the pending multipart // uploads for a particular object in a bucket. // @@ -446,14 +138,11 @@ func (xl xlObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploa // not support prefix based listing, this is a deliberate attempt // towards simplification of multipart APIs. // The resulting ListMultipartsInfo structure is unmarshalled directly as XML. -func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { +func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, e error) { if err := checkListMultipartArgs(bucket, object, keyMarker, uploadIDMarker, delimiter, xl); err != nil { - return lmi, err + return result, err } - result := ListMultipartsInfo{} - - result.IsTruncated = true result.MaxUploads = maxUploads result.KeyMarker = keyMarker result.Prefix = object @@ -463,31 +152,22 @@ func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMark if disk == nil { continue } - // Hold the lock so that two parallel complete-multipart-uploads - // do not leave a stale uploads.json behind. - objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) - if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil { - return lmi, err - } - defer objectMPartPathLock.RUnlock() - uploads, _, err := xl.listMultipartUploadIDs(bucket, object, uploadIDMarker, maxUploads, disk) + uploadIDs, err := disk.ListDir(minioMetaMultipartBucket, xl.getMultipartSHADir(bucket, object)) if err != nil { - return lmi, err + if err == errFileNotFound { + return result, nil + } + return result, errors.Trace(err) } - - result.NextKeyMarker = object - // Loop through all the received uploads fill in the multiparts result. - for _, upload := range uploads { - uploadID := upload.UploadID - result.Uploads = append(result.Uploads, upload) - result.NextUploadIDMarker = uploadID + for i := range uploadIDs { + uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], slashSeparator) } - - result.IsTruncated = len(uploads) == maxUploads - - if !result.IsTruncated { - result.NextKeyMarker = "" - result.NextUploadIDMarker = "" + sort.Strings(uploadIDs) + for _, uploadID := range uploadIDs { + if len(result.Uploads) == maxUploads { + break + } + result.Uploads = append(result.Uploads, MultipartInfo{Object: object, UploadID: uploadID}) } break } @@ -527,17 +207,8 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st xlMeta.Stat.ModTime = UTCNow() xlMeta.Meta = meta - // This lock needs to be held for any changes to the directory - // contents of ".minio.sys/multipart/object/" - objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, object)) - if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { - return "", err - } - defer objectMPartPathLock.Unlock() - uploadID := mustGetUUID() - uploadIDPath := path.Join(bucket, object, uploadID) + uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID) tempUploadIDPath := uploadID // Write updated `xl.json` to all disks. @@ -556,11 +227,6 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) } - initiated := UTCNow() - // Create or update 'uploads.json' - if err = xl.addUploadID(bucket, object, uploadID, initiated, writeQuorum); err != nil { - return "", err - } // Return success. return uploadID, nil } @@ -637,17 +303,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d return pi, toObjectErr(errors.Trace(errInvalidArgument)) } - // Hold the lock so that two parallel complete-multipart-uploads - // do not leave a stale uploads.json behind. - objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) - if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { - return pi, err - } - defer objectMPartPathLock.Unlock() - var partsMetadata []xlMetaV1 var errs []error - uploadIDPath := pathJoin(bucket, object, uploadID) + uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID) // pre-check upload id lock. preUploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) @@ -803,16 +461,35 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d }, nil } -// listObjectParts - wrapper reading `xl.json` for a given object and -// uploadID. Lists all the parts captured inside `xl.json` content. -func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { - result := ListPartsInfo{} +// ListObjectParts - lists all previously uploaded parts for a given +// object and uploadID. Takes additional input of part-number-marker +// to indicate where the listing should begin from. +// +// Implements S3 compatible ListObjectParts API. The resulting +// ListPartsInfo structure is marshalled directly into XML and +// replied back to the client. +func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (result ListPartsInfo, e error) { + if err := checkListPartsArgs(bucket, object, xl); err != nil { + return result, err + } + // Hold lock so that there is no competing + // abort-multipart-upload or complete-multipart-upload. + uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, + xl.getUploadIDDir(bucket, object, uploadID)) + if err := uploadIDLock.GetLock(globalListingTimeout); err != nil { + return result, err + } + defer uploadIDLock.Unlock() + + if !xl.isUploadIDExists(bucket, object, uploadID) { + return result, errors.Trace(InvalidUploadID{UploadID: uploadID}) + } - uploadIDPath := path.Join(bucket, object, uploadID) + uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID) xlParts, xlMeta, err := xl.readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) if err != nil { - return lpi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) + return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } // Populate the result stub. @@ -844,7 +521,7 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM var fi FileInfo fi, err = xl.statPart(bucket, object, uploadID, part.Name) if err != nil { - return lpi, toObjectErr(err, minioMetaBucket, path.Join(uploadID, part.Name)) + return result, toObjectErr(err, minioMetaBucket, path.Join(uploadID, part.Name)) } result.Parts = append(result.Parts, PartInfo{ PartNumber: part.Number, @@ -868,40 +545,6 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM return result, nil } -// ListObjectParts - lists all previously uploaded parts for a given -// object and uploadID. Takes additional input of part-number-marker -// to indicate where the listing should begin from. -// -// Implements S3 compatible ListObjectParts API. The resulting -// ListPartsInfo structure is unmarshalled directly into XML and -// replied back to the client. -func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { - if err := checkListPartsArgs(bucket, object, xl); err != nil { - return lpi, err - } - // Hold the lock so that two parallel complete-multipart-uploads - // do not leave a stale uploads.json behind. - objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) - if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil { - return lpi, errors.Trace(err) - } - defer objectMPartPathLock.RUnlock() - // Hold lock so that there is no competing - // abort-multipart-upload or complete-multipart-upload. - uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, object, uploadID)) - if err := uploadIDLock.GetLock(globalListingTimeout); err != nil { - return lpi, err - } - defer uploadIDLock.Unlock() - - if !xl.isUploadIDExists(bucket, object, uploadID) { - return lpi, errors.Trace(InvalidUploadID{UploadID: uploadID}) - } - result, err := xl.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) - return result, err -} - // CompleteMultipartUpload - completes an ongoing multipart // transaction after receiving all the parts indicated by the client. // Returns an md5sum calculated by concatenating all the individual @@ -918,14 +561,16 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return oi, err } defer destLock.Unlock() + + uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID) + // Hold lock so that // // 1) no one aborts this multipart upload // // 2) no one does a parallel complete-multipart-upload on this // multipart upload - uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, object, uploadID)) + uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil { return oi, err } @@ -947,8 +592,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return oi, err } - uploadIDPath := pathJoin(bucket, object, uploadID) - // Read metadata associated with the object from all disks. partsMetadata, errs := readAllXLMetadata(xl.getDisks(), minioMetaMultipartBucket, uploadIDPath) @@ -1034,7 +677,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Save successfully calculated md5sum. xlMeta.Meta["etag"] = s3MD5 - uploadIDPath = path.Join(bucket, object, uploadID) tempUploadIDPath := uploadID // Update all xl metadata, make sure to not modify fields like @@ -1090,21 +732,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return oi, toObjectErr(err, bucket, object) } - // Hold the lock so that two parallel - // complete-multipart-uploads do not leave a stale - // uploads.json behind. - objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, object)) - if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { - return oi, toObjectErr(err, bucket, object) - } - defer objectMPartPathLock.Unlock() - - // remove entry from uploads.json with quorum - if err = xl.removeUploadID(bucket, object, uploadID, writeQuorum); err != nil { - return oi, toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object)) - } - // Success, return object info. return xlMeta.ToObjectInfo(bucket, object), nil } @@ -1137,46 +764,6 @@ func (xl xlObjects) cleanupUploadedParts(uploadIDPath string, writeQuorum int) e return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum) } -// abortMultipartUpload - wrapper for purging an ongoing multipart -// transaction, deletes uploadID entry from `uploads.json` and purges -// the directory at '.minio.sys/multipart/bucket/object/uploadID' holding -// all the upload parts. -func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err error) { - // Construct uploadIDPath. - uploadIDPath := path.Join(bucket, object, uploadID) - - // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllXLMetadata(xl.getDisks(), minioMetaMultipartBucket, uploadIDPath) - - // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs) - if err != nil { - return toObjectErr(err, bucket, object) - } - - // Cleanup all uploaded parts. - if err = xl.cleanupUploadedParts(uploadIDPath, writeQuorum); err != nil { - return toObjectErr(err, bucket, object) - } - - // hold lock so we don't compete with a complete, or abort - // multipart request. - objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, object)) - if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil { - return toObjectErr(err, bucket, object) - } - defer objectMPartPathLock.Unlock() - - // remove entry from uploads.json with quorum - if err = xl.removeUploadID(bucket, object, uploadID, writeQuorum); err != nil { - return toObjectErr(err, bucket, object) - } - - // Successfully purged. - return nil -} - // AbortMultipartUpload - aborts an ongoing multipart operation // signified by the input uploadID. This is an atomic operation // doesn't require clients to initiate multiple such requests. @@ -1192,10 +779,11 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error if err := checkAbortMultipartArgs(bucket, object, xl); err != nil { return err } + // Construct uploadIDPath. + uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID) // Hold lock so that there is no competing // complete-multipart-upload or put-object-part. - uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, object, uploadID)) + uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil { return err } @@ -1204,5 +792,76 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error if !xl.isUploadIDExists(bucket, object, uploadID) { return errors.Trace(InvalidUploadID{UploadID: uploadID}) } - return xl.abortMultipartUpload(bucket, object, uploadID) + + // Read metadata associated with the object from all disks. + partsMetadata, errs := readAllXLMetadata(xl.getDisks(), minioMetaMultipartBucket, uploadIDPath) + + // get Quorum for this object + _, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs) + if err != nil { + return toObjectErr(err, bucket, object) + } + + // Cleanup all uploaded parts. + if err = xl.cleanupUploadedParts(uploadIDPath, writeQuorum); err != nil { + return toObjectErr(err, bucket, object) + } + + // Successfully purged. + return nil +} + +// Clean-up the old multipart uploads. Should be run in a Go routine. +func (xl xlObjects) cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, doneCh chan struct{}) { + ticker := time.NewTicker(cleanupInterval) + + for { + select { + case <-doneCh: + ticker.Stop() + return + case <-ticker.C: + var disk StorageAPI + for _, d := range xl.getLoadBalancedDisks() { + if d != nil { + disk = d + break + } + } + if disk == nil { + continue + } + xl.cleanupStaleMultipartUploadsOnDisk(disk, expiry) + } + } +} + +// Remove the old multipart uploads on the given disk. +func (xl xlObjects) cleanupStaleMultipartUploadsOnDisk(disk StorageAPI, expiry time.Duration) { + now := time.Now() + shaDirs, err := disk.ListDir(minioMetaMultipartBucket, "") + if err != nil { + return + } + for _, shaDir := range shaDirs { + uploadIDDirs, err := disk.ListDir(minioMetaMultipartBucket, shaDir) + if err != nil { + continue + } + for _, uploadIDDir := range uploadIDDirs { + uploadIDPath := pathJoin(shaDir, uploadIDDir) + fi, err := disk.StatFile(minioMetaMultipartBucket, pathJoin(uploadIDPath, xlMetaJSONFile)) + if err != nil { + continue + } + if now.Sub(fi.ModTime) > expiry { + // Quorum value will need to be figured out using readAllXLMetadata() and objectQuorumFromMeta() + // But we can avoid these calls as we do not care if xl.cleanupUploadedParts() meets quorum + // when it removes files. We igore the error message from xl.cleanupUploadedParts() as we can't + // return it to any client. Hence we set quorum to 0. + quorum := 0 + xl.cleanupUploadedParts(uploadIDPath, quorum) + } + } + } } diff --git a/cmd/xl-v1-multipart_test.go b/cmd/xl-v1-multipart_test.go index 4febe5838..27a016353 100644 --- a/cmd/xl-v1-multipart_test.go +++ b/cmd/xl-v1-multipart_test.go @@ -25,7 +25,7 @@ import ( ) // Tests cleanup multipart uploads for erasure coded backend. -func TestXLCleanupMultipartUploadsInRoutine(t *testing.T) { +func TestXLCleanupStaleMultipartUploads(t *testing.T) { // Initialize configuration root, err := newTestConfig(globalMinioDefaultRegion) if err != nil { @@ -56,7 +56,7 @@ func TestXLCleanupMultipartUploadsInRoutine(t *testing.T) { t.Fatal("Unexpected err: ", err) } - go cleanupStaleMultipartUploads(20*time.Millisecond, 0, obj, xl.listMultipartUploadsCleanup, globalServiceDoneCh) + go xl.cleanupStaleMultipartUploads(20*time.Millisecond, 0, globalServiceDoneCh) // Wait for 40ms such that - we have given enough time for // cleanup routine to kick in. @@ -73,102 +73,3 @@ func TestXLCleanupMultipartUploadsInRoutine(t *testing.T) { } } } - -// Tests cleanup of stale upload ids. -func TestXLCleanupMultipartUpload(t *testing.T) { - // Initialize configuration - root, err := newTestConfig(globalMinioDefaultRegion) - if err != nil { - t.Fatalf("%s", err) - } - defer os.RemoveAll(root) - - // Create an instance of xl backend - obj, fsDirs, err := prepareXL16() - if err != nil { - t.Fatal(err) - } - // Defer cleanup of backend directories - defer removeRoots(fsDirs) - - xl := obj.(*xlObjects) - - // Close the go-routine, we are going to - // manually start it and test in this test case. - globalServiceDoneCh <- struct{}{} - - bucketName := "bucket" - objectName := "object" - - obj.MakeBucketWithLocation(bucketName, "") - uploadID, err := obj.NewMultipartUpload(bucketName, objectName, nil) - if err != nil { - t.Fatal("Unexpected err: ", err) - } - - if err = cleanupStaleMultipartUpload(bucketName, 0, obj, xl.listMultipartUploadsCleanup); err != nil { - t.Fatal("Unexpected err: ", err) - } - - // Check if upload id was already purged. - if err = obj.AbortMultipartUpload(bucketName, objectName, uploadID); err != nil { - err = errors.Cause(err) - if _, ok := err.(InvalidUploadID); !ok { - t.Fatal("Unexpected err: ", err) - } - } -} - -func TestUpdateUploadJSON(t *testing.T) { - // Initialize configuration - root, err := newTestConfig(globalMinioDefaultRegion) - if err != nil { - t.Fatalf("%s", err) - } - defer os.RemoveAll(root) - - // Create an instance of xl backend - obj, fsDirs, err := prepareXL16() - if err != nil { - t.Fatal(err) - } - // Defer cleanup of backend directories - defer removeRoots(fsDirs) - - bucket, object := "bucket", "object" - err = obj.MakeBucketWithLocation(bucket, "") - if err != nil { - t.Fatal(err) - } - - testCases := []struct { - uploadID string - initiated time.Time - writeQuorum int - isRemove bool - errVal error - }{ - {"111abc", UTCNow(), 9, false, nil}, - {"222abc", UTCNow(), 10, false, nil}, - {"111abc", time.Time{}, 11, true, nil}, - } - - xl := obj.(*xlObjects) - for i, test := range testCases { - testErrVal := xl.updateUploadJSON(bucket, object, test.uploadID, test.initiated, test.writeQuorum, test.isRemove) - if testErrVal != test.errVal { - t.Errorf("Test %d: Expected error value %v, but got %v", - i+1, test.errVal, testErrVal) - } - } - - // make some disks faulty to simulate a failure. - for i := range xl.storageDisks[:9] { - xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i], nil, errFaultyDisk) - } - - testErrVal := xl.updateUploadJSON(bucket, object, "222abc", UTCNow(), 10, false) - if testErrVal == nil || testErrVal.Error() != errXLWriteQuorum.Error() { - t.Errorf("Expected write quorum error, but got: %v", testErrVal) - } -} diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index f217d9b0a..b37575a48 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -29,9 +29,6 @@ import ( const ( // XL metadata file carries per object metadata. xlMetaJSONFile = "xl.json" - - // Uploads metadata file carries per multipart object metadata. - uploadsJSONFile = "uploads.json" ) // xlObjects - Implements XL object layer.