diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 17ab5983b..6abfa953c 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -422,7 +422,7 @@ func TestHealEmptyDirectoryErasure(t *testing.T) { z := obj.(*erasureZones) er := z.zones[0].sets[0] firstDisk := er.getDisks()[0] - err = firstDisk.DeleteFile(context.Background(), bucket, object) + err = firstDisk.DeleteVol(context.Background(), pathJoin(bucket, encodeDirObject(object)), true) if err != nil { t.Fatalf("Failed to delete a file - %v", err) } @@ -434,7 +434,7 @@ func TestHealEmptyDirectoryErasure(t *testing.T) { } // Check if the empty directory is restored in the first disk - _, err = firstDisk.StatVol(context.Background(), pathJoin(bucket, object)) + _, err = firstDisk.StatVol(context.Background(), pathJoin(bucket, encodeDirObject(object))) if err != nil { t.Fatalf("Expected object to be present but stat failed - %v", err) } diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 4faffd719..abd538558 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -96,21 +96,14 @@ func (fi FileInfo) IsValid() bool { // ToObjectInfo - Converts metadata to object info. func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo { - if HasSuffix(object, SlashSeparator) { - return ObjectInfo{ - Bucket: bucket, - Name: object, - IsDir: true, - } - } - + object = decodeDirObject(object) versionID := fi.VersionID if globalBucketVersioningSys.Enabled(bucket) && versionID == "" { versionID = nullVersionID } objInfo := ObjectInfo{ - IsDir: false, + IsDir: HasSuffix(object, SlashSeparator), Bucket: bucket, Name: object, VersionID: versionID, diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 9dd4605f4..b622bff68 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -460,6 +460,8 @@ func (z *erasureZones) MakeBucketWithLocation(ctx context.Context, bucket string } func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { + object = encodeDirObject(object) + for _, zone := range z.zones { gr, err = zone.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) if err != nil { @@ -477,6 +479,8 @@ func (z *erasureZones) GetObjectNInfo(ctx context.Context, bucket, object string } func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { + object = encodeDirObject(object) + for _, zone := range z.zones { if err := zone.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts); err != nil { if isErrObjectNotFound(err) || isErrVersionNotFound(err) { @@ -493,6 +497,7 @@ func (z *erasureZones) GetObject(ctx context.Context, bucket, object string, sta } func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { + object = encodeDirObject(object) for _, zone := range z.zones { objInfo, err = zone.GetObjectInfo(ctx, bucket, object, opts) if err != nil { @@ -503,6 +508,7 @@ func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, } return objInfo, nil } + object = decodeDirObject(object) if opts.VersionID != "" { return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} } @@ -511,6 +517,8 @@ func (z *erasureZones) GetObjectInfo(ctx context.Context, bucket, object string, // PutObject - writes an object to least used erasure zone. func (z *erasureZones) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (ObjectInfo, error) { + object = encodeDirObject(object) + if z.SingleZone() { return z.zones[0].PutObject(ctx, bucket, object, data, opts) } @@ -525,6 +533,8 @@ func (z *erasureZones) PutObject(ctx context.Context, bucket string, object stri } func (z *erasureZones) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { + object = encodeDirObject(object) + if z.SingleZone() { return z.zones[0].DeleteObject(ctx, bucket, object, opts) } @@ -545,6 +555,8 @@ func (z *erasureZones) DeleteObjects(ctx context.Context, bucket string, objects dobjects := make([]DeletedObject, len(objects)) objSets := set.NewStringSet() for i := range derrs { + objects[i].ObjectName = encodeDirObject(objects[i].ObjectName) + derrs[i] = checkDelObjArgs(ctx, bucket, objects[i].ObjectName) objSets.Add(objects[i].ObjectName) } @@ -576,6 +588,9 @@ func (z *erasureZones) DeleteObjects(ctx context.Context, bucket string, objects } func (z *erasureZones) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) { + srcObject = encodeDirObject(srcObject) + dstObject = encodeDirObject(dstObject) + cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) zoneIdx, err := z.getZoneIdx(ctx, dstBucket, dstObject, dstOpts, srcInfo.Size) @@ -935,7 +950,16 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI zoneIndex = i continue } - if zoneEntries[i][j].Name < lentry.Name { + str1 := zoneEntries[i][j].Name + str2 := lentry.Name + if HasSuffix(str1, globalDirSuffix) { + str1 = strings.TrimSuffix(str1, globalDirSuffix) + slashSeparator + } + if HasSuffix(str2, globalDirSuffix) { + str2 = strings.TrimSuffix(str2, globalDirSuffix) + slashSeparator + } + + if str1 < str2 { lentry = zoneEntries[i][j] zoneIndex = i } @@ -968,6 +992,10 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI } } + if HasSuffix(lentry.Name, globalDirSuffix) { + lentry.Name = strings.TrimSuffix(lentry.Name, globalDirSuffix) + slashSeparator + } + return lentry, lexicallySortedEntryCount, zoneIndex, isTruncated } @@ -1013,7 +1041,16 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE zoneIndex = i continue } - if zoneEntries[i][j].Name < lentry.Name { + str1 := zoneEntries[i][j].Name + str2 := lentry.Name + if HasSuffix(str1, globalDirSuffix) { + str1 = strings.TrimSuffix(str1, globalDirSuffix) + slashSeparator + } + if HasSuffix(str2, globalDirSuffix) { + str2 = strings.TrimSuffix(str2, globalDirSuffix) + slashSeparator + } + + if str1 < str2 { lentry = zoneEntries[i][j] zoneIndex = i } @@ -1046,6 +1083,10 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE } } + if HasSuffix(lentry.Name, globalDirSuffix) { + lentry.Name = strings.TrimSuffix(lentry.Name, globalDirSuffix) + slashSeparator + } + return lentry, lexicallySortedEntryCount, zoneIndex, isTruncated } @@ -1058,6 +1099,7 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfoVersions, len(entryChs))) zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs))) } + for { fi, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) if !ok { @@ -1089,6 +1131,7 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, zonesListTol zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs))) zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs))) } + var prevEntry string for { fi, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) if !ok { @@ -1101,12 +1144,17 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, zonesListTol continue } + if HasSuffix(fi.Name, slashSeparator) && fi.Name == prevEntry { + continue + } + entries.Files = append(entries.Files, fi) i++ if i == maxKeys { entries.IsTruncated = isTruncatedZones(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) break } + prevEntry = fi.Name } return entries } @@ -1836,6 +1884,8 @@ func (z *erasureZones) HealObjects(ctx context.Context, bucket, prefix string, o } func (z *erasureZones) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { + object = encodeDirObject(object) + lk := z.NewNSLock(ctx, bucket, object) if bucket == minioMetaBucket { // For .minio.sys bucket heals we should hold write locks. @@ -1956,6 +2006,7 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes parityDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD) diskCount := z.SetDriveCount() + if parityDrives == 0 { parityDrives = getDefaultParityBlocks(diskCount) } @@ -2019,6 +2070,7 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes // PutObjectTags - replace or add tags to an existing object func (z *erasureZones) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error { + object = encodeDirObject(object) if z.SingleZone() { return z.zones[0].PutObjectTags(ctx, bucket, object, tags, opts) } @@ -2048,6 +2100,7 @@ func (z *erasureZones) PutObjectTags(ctx context.Context, bucket, object string, // DeleteObjectTags - delete object tags from an existing object func (z *erasureZones) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) error { + object = encodeDirObject(object) if z.SingleZone() { return z.zones[0].DeleteObjectTags(ctx, bucket, object, opts) } @@ -2076,6 +2129,7 @@ func (z *erasureZones) DeleteObjectTags(ctx context.Context, bucket, object stri // GetObjectTags - get object tags from an existing object func (z *erasureZones) GetObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (*tags.Tags, error) { + object = encodeDirObject(object) if z.SingleZone() { return z.zones[0].GetObjectTags(ctx, bucket, object, opts) } diff --git a/cmd/globals.go b/cmd/globals.go index 91eb0fb07..889ca4a8e 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -63,6 +63,8 @@ const ( globalMinioModeErasure = "mode-server-xl" globalMinioModeDistErasure = "mode-server-distributed-xl" globalMinioModeGatewayPrefix = "mode-gateway-" + globalDirSuffix = "__XLDIR__" + globalDirSuffixWithSlash = globalDirSuffix + slashSeparator // Add new global values here. ) diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index caeeaa415..1893c0aad 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -22,12 +22,18 @@ import ( "fmt" "io" "path" + "strings" ) // Converts underlying storage error. Convenience function written to // handle all cases where we have known types of errors returned by // underlying storage layer. func toObjectErr(err error, params ...string) error { + if len(params) > 1 { + if HasSuffix(params[1], globalDirSuffix) { + params[1] = strings.TrimSuffix(params[1], globalDirSuffix) + slashSeparator + } + } switch err { case errVolumeNotFound: if len(params) >= 1 { diff --git a/cmd/server_test.go b/cmd/server_test.go index a2acb9f25..adcecba4b 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -131,6 +131,7 @@ func TestServerSuite(t *testing.T) { // Init and run test on ErasureSet backend. {serverType: "ErasureSet", signer: signerV4}, } + globalCLIContext.StrictS3Compat = true for i, testCase := range testCases { t.Run(fmt.Sprintf("Test: %d, ServerType: %s", i+1, testCase.serverType), func(t *testing.T) { runAllTests(testCase, &check{t, testCase.serverType}) @@ -262,20 +263,6 @@ func (s *TestSuiteCommon) TestObjectDir(c *check) { // assert the http response status code. c.Assert(response.StatusCode, http.StatusOK) - request, err = newTestSignedRequest(http.MethodPut, getPutObjectURL(s.endPoint, bucketName, "my-object-directory/"), - 0, nil, s.accessKey, s.secretKey, s.signer) - c.Assert(err, nil) - - helloReader := bytes.NewReader([]byte("Hello, World")) - request.ContentLength = helloReader.Size() - request.Body = ioutil.NopCloser(helloReader) - - // execute the HTTP request. - response, err = s.client.Do(request) - - c.Assert(err, nil) - verifyError(c, response, "XMinioInvalidObjectName", "Object name contains unsupported characters.", http.StatusBadRequest) - request, err = newTestSignedRequest(http.MethodHead, getHeadObjectURL(s.endPoint, bucketName, "my-object-directory/"), 0, nil, s.accessKey, s.secretKey, s.signer) c.Assert(err, nil) diff --git a/cmd/tree-walk.go b/cmd/tree-walk.go index f307e31da..21003f149 100644 --- a/cmd/tree-walk.go +++ b/cmd/tree-walk.go @@ -57,6 +57,9 @@ func filterMatchingPrefix(entries []string, prefixEntry string) []string { // isLeaf should be done in listDir() func delayIsLeafCheck(entries []string) bool { for i, entry := range entries { + if HasSuffix(entry, globalDirSuffixWithSlash) { + return false + } if i == len(entries)-1 { break } @@ -96,7 +99,20 @@ func filterListEntries(bucket, prefixDir string, entries []string, prefixEntry s entries = filterMatchingPrefix(entries, prefixEntry) // Listing needs to be sorted. - sort.Strings(entries) + sort.Slice(entries, func(i, j int) bool { + if !HasSuffix(entries[i], globalDirSuffixWithSlash) && !HasSuffix(entries[j], globalDirSuffixWithSlash) { + return entries[i] < entries[j] + } + first := entries[i] + second := entries[j] + if HasSuffix(first, globalDirSuffixWithSlash) { + first = strings.TrimSuffix(first, globalDirSuffixWithSlash) + slashSeparator + } + if HasSuffix(second, globalDirSuffixWithSlash) { + second = strings.TrimSuffix(second, globalDirSuffixWithSlash) + slashSeparator + } + return first < second + }) // Can isLeaf() check be delayed till when it has to be sent down the // TreeWalkResult channel? @@ -114,8 +130,23 @@ func filterListEntries(bucket, prefixDir string, entries []string, prefixEntry s // Sort again after removing trailing "/" for objects as the previous sort // does not hold good anymore. - sort.Strings(entries) - + sort.Slice(entries, func(i, j int) bool { + if !HasSuffix(entries[i], globalDirSuffix) && !HasSuffix(entries[j], globalDirSuffix) { + return entries[i] < entries[j] + } + first := entries[i] + second := entries[j] + if HasSuffix(first, globalDirSuffix) { + first = strings.TrimSuffix(first, globalDirSuffix) + slashSeparator + } + if HasSuffix(second, globalDirSuffix) { + second = strings.TrimSuffix(second, globalDirSuffix) + slashSeparator + } + if first == second { + return HasSuffix(entries[i], globalDirSuffix) + } + return first < second + }) return entries, false } @@ -169,10 +200,10 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker entry = strings.TrimSuffix(entry, slashSeparator) } } else { - leaf = !strings.HasSuffix(entry, slashSeparator) + leaf = !HasSuffix(entry, slashSeparator) } - if strings.HasSuffix(entry, slashSeparator) { + if HasSuffix(entry, slashSeparator) { leafDir = isLeafDir(bucket, pathJoin(prefixDir, entry)) } diff --git a/cmd/utils.go b/cmd/utils.go index fa504d172..e99f9fd55 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -737,3 +737,20 @@ func (t *timedValue) Invalidate() { t.value = nil t.mu.Unlock() } + +// On MinIO a directory object is stored as a regular object with "__XLDIR__" suffix. +// For ex. "prefix/" is stored as "prefix__XLDIR__" +func encodeDirObject(object string) string { + if HasSuffix(object, slashSeparator) { + return strings.TrimSuffix(object, slashSeparator) + globalDirSuffix + } + return object +} + +// Reverse process of encodeDirObject() +func decodeDirObject(object string) string { + if HasSuffix(object, globalDirSuffix) { + return strings.TrimSuffix(object, globalDirSuffix) + slashSeparator + } + return object +}