diff --git a/cmd/bucket-quota.go b/cmd/bucket-quota.go index 84288d166..6ce4e0a14 100644 --- a/cmd/bucket-quota.go +++ b/cmd/bucket-quota.go @@ -196,8 +196,10 @@ func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) { // Allocate new results channel to receive ObjectInfo. objInfoCh := make(chan ObjectInfo) + versioned := globalBucketVersioningSys.Enabled(bucket) + // Walk through all objects - if err := objectAPI.Walk(ctx, bucket, "", objInfoCh); err != nil { + if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{WalkVersions: versioned}); err != nil { logger.LogIf(ctx, err) continue } @@ -226,8 +228,6 @@ func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) { scorer.addFileWithObjInfo(obj, 1) } - versioned := globalBucketVersioningSys.Enabled(bucket) - var objects []ObjectToDelete numKeys := len(scorer.fileObjInfos()) for i, obj := range scorer.fileObjInfos() { diff --git a/cmd/erasure-list-objects.go b/cmd/erasure-list-objects.go index d5c2c6b0d..74640d467 100644 --- a/cmd/erasure-list-objects.go +++ b/cmd/erasure-list-objects.go @@ -53,6 +53,6 @@ func (er erasureObjects) HealObjects(ctx context.Context, bucket, prefix string, } // Walk - This is not implemented/needed anymore, look for erasure-zones.Walk() -func (er erasureObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { +func (er erasureObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, _ ObjectOptions) error { return NotImplemented{} } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 3c04d83ba..d9f2f2d1c 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -855,8 +855,72 @@ func (f *FileInfoCh) Push(fi FileInfo) { f.Valid = true } -// Calculate least entry across multiple FileInfo channels, -// returns the least common entry and the total number of times +// Calculate lexically least entry across multiple FileInfo channels, +// returns the lexically common entry and the total number of times +// we found this entry. Additionally also returns a boolean +// to indicate if the caller needs to call this function +// again to list the next entry. It is callers responsibility +// if the caller wishes to list N entries to call lexicallySortedEntry +// N times until this boolean is 'false'. +func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) { + for i := range entryChs { + entries[i], entriesValid[i] = entryChs[i].Pop() + } + + var isTruncated = false + for _, valid := range entriesValid { + if !valid { + continue + } + isTruncated = true + break + } + + var lentry FileInfo + var found bool + for i, valid := range entriesValid { + if !valid { + continue + } + if !found { + lentry = entries[i] + found = true + continue + } + if entries[i].Name < lentry.Name { + lentry = entries[i] + } + } + + // We haven't been able to find any lexically least entry, + // this would mean that we don't have valid entry. + if !found { + return lentry, 0, isTruncated + } + + lexicallySortedEntryCount := 0 + for i, valid := range entriesValid { + if !valid { + continue + } + + // Entries are duplicated across disks, + // we should simply skip such entries. + if lentry.Name == entries[i].Name && lentry.ModTime.Equal(entries[i].ModTime) { + lexicallySortedEntryCount++ + continue + } + + // Push all entries which are lexically higher + // and will be returned later in Pop() + entryChs[i].Push(entries[i]) + } + + return lentry, lexicallySortedEntryCount, isTruncated +} + +// Calculate lexically least entry across multiple FileInfo channels, +// returns the lexically common entry and the total number of times // we found this entry. Additionally also returns a boolean // to indicate if the caller needs to call this function // again to list the next entry. It is callers responsibility @@ -892,7 +956,7 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI } } - // We haven't been able to find any least entry, + // We haven't been able to find any lexically least entry, // this would mean that we don't have valid entry. if !found { return lentry, 0, isTruncated @@ -1508,32 +1572,58 @@ func (s *erasureSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) // to allocate a receive channel for ObjectInfo, upon any unhandled // error walker returns error. Optionally if context.Done() is received // then Walk() stops the walker. -func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { +func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error { if err := checkListObjsArgs(ctx, bucket, prefix, "", s); err != nil { // Upon error close the channel. close(results) return err } - entryChs := s.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done()) + if opts.WalkVersions { + entryChs := s.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done()) + + entriesValid := make([]bool, len(entryChs)) + entries := make([]FileInfoVersions, len(entryChs)) + + go func() { + defer close(results) + + for { + entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) + if !ok { + return + } + + if quorumCount >= s.drivesPerSet/2 { + // Read quorum exists proceed + for _, version := range entry.Versions { + results <- version.ToObjectInfo(bucket, version.Name) + } + } + // skip entries which do not have quorum + } + }() + + return nil + } + + entryChs := s.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done()) entriesValid := make([]bool, len(entryChs)) - entries := make([]FileInfoVersions, len(entryChs)) + entries := make([]FileInfo, len(entryChs)) go func() { defer close(results) for { - entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) + entry, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid) if !ok { return } if quorumCount >= s.drivesPerSet/2 { // Read quorum exists proceed - for _, version := range entry.Versions { - results <- version.ToObjectInfo(bucket, version.Name) - } + results <- entry.ToObjectInfo(bucket, entry.Name) } // skip entries which do not have quorum } diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 09b0498a2..5e61ac942 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -1799,16 +1799,58 @@ func (z *erasureZones) HealBucket(ctx context.Context, bucket string, dryRun, re // to allocate a receive channel for ObjectInfo, upon any unhandled // error walker returns error. Optionally if context.Done() is received // then Walk() stops the walker. -func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { +func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error { if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil { // Upon error close the channel. close(results) return err } - var zonesEntryChs [][]FileInfoVersionsCh + if opts.WalkVersions { + var zonesEntryChs [][]FileInfoVersionsCh + for _, zone := range z.zones { + zonesEntryChs = append(zonesEntryChs, zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done())) + } + + var zoneDrivesPerSet []int + for _, zone := range z.zones { + zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet) + } + + var zonesEntriesInfos [][]FileInfoVersions + var zonesEntriesValid [][]bool + for _, entryChs := range zonesEntryChs { + zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfoVersions, len(entryChs))) + zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs))) + } + + go func() { + defer close(results) + + for { + entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) + if !ok { + // We have reached EOF across all entryChs, break the loop. + return + } + + if quorumCount >= zoneDrivesPerSet[zoneIndex]/2 { + // Read quorum exists proceed + for _, version := range entry.Versions { + results <- version.ToObjectInfo(bucket, version.Name) + } + } + + // skip entries which do not have quorum + } + }() + + return nil + } + + var zonesEntryChs [][]FileInfoCh for _, zone := range z.zones { - zonesEntryChs = append(zonesEntryChs, zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done())) + zonesEntryChs = append(zonesEntryChs, zone.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done())) } var zoneDrivesPerSet []int @@ -1816,10 +1858,10 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet) } - var zonesEntriesInfos [][]FileInfoVersions + var zonesEntriesInfos [][]FileInfo var zonesEntriesValid [][]bool for _, entryChs := range zonesEntryChs { - zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfoVersions, len(entryChs))) + zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs))) zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs))) } @@ -1827,7 +1869,7 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results defer close(results) for { - entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) + entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) if !ok { // We have reached EOF across all entryChs, break the loop. return @@ -1835,11 +1877,8 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results if quorumCount >= zoneDrivesPerSet[zoneIndex]/2 { // Read quorum exists proceed - for _, version := range entry.Versions { - results <- version.ToObjectInfo(bucket, version.Name) - } + results <- entry.ToObjectInfo(bucket, entry.Name) } - // skip entries which do not have quorum } }() diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index f5e401727..339a90a5e 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -1484,7 +1484,7 @@ func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remo // to allocate a receive channel for ObjectInfo, upon any unhandled // error walker returns error. Optionally if context.Done() is received // then Walk() stops the walker. -func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { +func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error { return fsWalk(ctx, fs, bucket, prefix, fs.listDirFactory(), results, fs.getObjectInfo, fs.getObjectInfo) } diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index a33d8598a..c45839205 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -57,7 +57,7 @@ func (l *GatewayLocker) NewNSLock(ctx context.Context, bucket string, objects .. } // Walk - implements common gateway level Walker, to walk on all objects recursively at a prefix -func (l *GatewayLocker) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { +func (l *GatewayLocker) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error { walk := func(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { var marker string @@ -85,7 +85,7 @@ func (l *GatewayLocker) Walk(ctx context.Context, bucket, prefix string, results return nil } - if err := l.ObjectLayer.Walk(ctx, bucket, prefix, results); err != nil { + if err := l.ObjectLayer.Walk(ctx, bucket, prefix, results, opts); err != nil { if _, ok := err.(NotImplemented); ok { return walk(ctx, bucket, prefix, results) } diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 462304f22..c88ba3a4d 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -186,7 +186,7 @@ func (a GatewayUnsupported) ListObjectsV2(ctx context.Context, bucket, prefix, c } // Walk - Not implemented stub -func (a GatewayUnsupported) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { +func (a GatewayUnsupported) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error { return NotImplemented{} } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 5f530a877..426216ed6 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -38,6 +38,7 @@ type GetObjectInfoFn func(ctx context.Context, bucket, object string, opts Objec type ObjectOptions struct { ServerSideEncryption encrypt.ServerSide Versioned bool // indicates if the bucket is versioned + WalkVersions bool // indicates if the we are interested in walking versions VersionID string // Specifies the versionID which needs to be overwritten or read MTime time.Time // Is only set in POST/PUT operations UserDefined map[string]string // only set in case of POST/PUT operations @@ -80,7 +81,7 @@ type ObjectLayer interface { ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (result ListObjectVersionsInfo, err error) // Walk lists all objects including versions, delete markers. - Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error + Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error // Object operations. diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 8c5791829..0fb5d61f2 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -655,8 +655,9 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs, return nil } - versioned := globalBucketVersioningSys.Enabled(args.BucketName) - + opts := ObjectOptions{ + Versioned: globalBucketVersioningSys.Enabled(args.BucketName), + } var err error next: for _, objectName := range args.Objects { @@ -690,7 +691,7 @@ next: } } - _, err = deleteObject(ctx, objectAPI, web.CacheAPI(), args.BucketName, objectName, r, ObjectOptions{}) + _, err = deleteObject(ctx, objectAPI, web.CacheAPI(), args.BucketName, objectName, r, opts) logger.LogIf(ctx, err) } @@ -723,7 +724,7 @@ next: objInfoCh := make(chan ObjectInfo) // Walk through all objects - if err = objectAPI.Walk(ctx, args.BucketName, objectName, objInfoCh); err != nil { + if err = objectAPI.Walk(ctx, args.BucketName, objectName, objInfoCh, ObjectOptions{}); err != nil { break next } @@ -736,7 +737,6 @@ next: } objects = append(objects, ObjectToDelete{ ObjectName: obj.Name, - VersionID: obj.VersionID, }) } @@ -746,7 +746,7 @@ next: } // Deletes a list of objects. - _, errs := deleteObjects(ctx, args.BucketName, objects, ObjectOptions{Versioned: versioned}) + _, errs := deleteObjects(ctx, args.BucketName, objects, opts) for _, err := range errs { if err != nil { logger.LogIf(ctx, err) @@ -1030,6 +1030,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { writeWebErrorResponse(w, err) return } + if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 { // Storing the compression metadata. metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 @@ -1052,15 +1053,15 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { return } } + pReader = NewPutObjReader(hashReader, nil, nil) // get gateway encryption options - var opts ObjectOptions - opts, err = putOpts(ctx, r, bucket, object, metadata) - + opts, err := putOpts(ctx, r, bucket, object, metadata) if err != nil { writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) return } + if objectAPI.IsEncryptionSupported() { if crypto.IsRequested(r.Header) && !HasSuffix(object, SlashSeparator) { // handle SSE requests rawReader := hashReader @@ -1545,7 +1546,7 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { objInfoCh := make(chan ObjectInfo) // Walk through all objects - if err := objectAPI.Walk(ctx, args.BucketName, pathJoin(args.Prefix, object), objInfoCh); err != nil { + if err := objectAPI.Walk(ctx, args.BucketName, pathJoin(args.Prefix, object), objInfoCh, ObjectOptions{}); err != nil { logger.LogIf(ctx, err) continue }