diff --git a/cmd/metacache-bucket.go b/cmd/metacache-bucket.go index 8859c2085..49aebbe0e 100644 --- a/cmd/metacache-bucket.go +++ b/cmd/metacache-bucket.go @@ -228,6 +228,10 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { debugPrint("cache %s prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir) continue } + if cached.filter != "" && strings.HasPrefix(cached.filter, o.FilterPrefix) { + debugPrint("cache %s cannot be used because of filter %s", cached.id, cached.filter) + continue + } // If the existing listing wasn't recursive root must match. if !cached.recursive && o.BaseDir != cached.root { debugPrint("cache %s non rec prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir) diff --git a/cmd/metacache-server-sets.go b/cmd/metacache-server-sets.go index ee75c71af..97aa001da 100644 --- a/cmd/metacache-server-sets.go +++ b/cmd/metacache-server-sets.go @@ -100,6 +100,8 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en rpc = nil o.Transient = true } + // Apply prefix filter if enabled. + o.SetFilter() if rpc == nil || o.Transient { // Local cache = localMetacacheMgr.findCache(ctx, o) diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 9e8948011..58c6faa7d 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -47,6 +47,11 @@ type listPathOptions struct { // Scan/return only content with prefix. Prefix string + // FilterPrefix will return only results with this prefix when scanning. + // Should never contain a slash. + // Prefix should still be set. + FilterPrefix string + // Marker to resume listing. // The response will be the first entry AFTER this object name. Marker string @@ -112,6 +117,7 @@ func (o listPathOptions) newMetacache() metacache { startedCycle: o.CurrentCycle, endedCycle: 0, dataVersion: metacacheStreamVersion, + filter: o.FilterPrefix, } } @@ -279,6 +285,28 @@ func (o *listPathOptions) objectPath(block int) string { return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block-"+strconv.Itoa(block)+".s2") } +func (o *listPathOptions) SetFilter() { + switch { + case metacacheSharePrefix: + return + case o.CurrentCycle != o.OldestCycle: + // We have a clean bloom filter + return + case o.Prefix == o.BaseDir: + // No additional prefix + return + } + // Remove basedir. + o.FilterPrefix = strings.TrimPrefix(o.Prefix, o.BaseDir) + // Remove leading and trailing slashes. + o.FilterPrefix = strings.Trim(o.FilterPrefix, slashSeparator) + + if strings.Contains(o.FilterPrefix, slashSeparator) { + // Sanity check, should not happen. + o.FilterPrefix = "" + } +} + // filter will apply the options and return the number of objects requested by the limit. // Will return io.EOF if there are no more entries with the same filter. // The last entry can be used as a marker to resume the listing. @@ -604,7 +632,11 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr } // Send request to each disk. go func() { - err := d.WalkDir(ctx, WalkDirOptions{Bucket: o.Bucket, BaseDir: o.BaseDir, Recursive: o.Recursive || o.Separator != SlashSeparator}, w) + err := d.WalkDir(ctx, WalkDirOptions{ + Bucket: o.Bucket, + BaseDir: o.BaseDir, + Recursive: o.Recursive || o.Separator != SlashSeparator, + FilterPrefix: o.FilterPrefix}, w) w.CloseWithError(err) if err != io.EOF { logger.LogIf(ctx, err) diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 6d2a755f7..a1b260f2c 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -42,6 +42,10 @@ type WalkDirOptions struct { // Do a full recursive scan. Recursive bool + + // FilterPrefix will only return results with given prefix within folder. + // Should never contain a slash. + FilterPrefix string } // WalkDir will traverse a directory and return all entries found. @@ -85,6 +89,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } defer close(out) + prefix := opts.FilterPrefix var scanDir func(path string) error scanDir = func(current string) error { entries, err := s.ListDir(ctx, opts.Bucket, current, -1) @@ -98,6 +103,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } dirObjects := make(map[string]struct{}) for i, entry := range entries { + if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) { + continue + } if strings.HasSuffix(entry, slashSeparator) { if strings.HasSuffix(entry, globalDirSuffixWithSlash) { // Add without extension so it is sorted correctly. @@ -148,6 +156,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ // Process in sort order. sort.Strings(entries) dirStack := make([]string, 0, 5) + prefix = "" // Remove prefix after first level. for _, entry := range entries { if entry == "" { continue @@ -232,6 +241,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption values.Set(storageRESTVolume, opts.Bucket) values.Set(storageRESTDirPath, opts.BaseDir) values.Set(storageRESTRecursive, strconv.FormatBool(opts.Recursive)) + values.Set(storageRESTPrefixFilter, opts.FilterPrefix) respBody, err := client.call(ctx, storageRESTMethodWalkDir, values, nil, -1) if err != nil { logger.LogIf(ctx, err) @@ -248,11 +258,17 @@ func (s *storageRESTServer) WalkDirHandler(w http.ResponseWriter, r *http.Reques vars := mux.Vars(r) volume := vars[storageRESTVolume] dirPath := vars[storageRESTDirPath] + prefix := vars[storageRESTPrefixFilter] recursive, err := strconv.ParseBool(vars[storageRESTRecursive]) if err != nil { s.writeErrorResponse(w, err) return } writer := streamHTTPResponse(w) - writer.CloseWithError(s.storage.WalkDir(r.Context(), WalkDirOptions{Bucket: volume, BaseDir: dirPath, Recursive: recursive}, writer)) + writer.CloseWithError(s.storage.WalkDir(r.Context(), WalkDirOptions{ + Bucket: volume, + BaseDir: dirPath, + Recursive: recursive, + FilterPrefix: prefix, + }, writer)) } diff --git a/cmd/metacache.go b/cmd/metacache.go index b6a3a5575..770853274 100644 --- a/cmd/metacache.go +++ b/cmd/metacache.go @@ -40,6 +40,12 @@ const ( // metacacheBlockSize is the number of file/directory entries to have in each block. metacacheBlockSize = 5000 + + // metacacheSharePrefix controls whether prefixes on dirty paths are always shared. + // This will make `test/a` and `test/b` share listings if they are concurrent. + // Enabling this will make cache sharing more likely and cause less IO, + // but may cause additional latency to some calls. + metacacheSharePrefix = false ) //go:generate msgp -file $GOFILE -unexported @@ -50,6 +56,7 @@ type metacache struct { bucket string `msg:"b"` root string `msg:"root"` recursive bool `msg:"rec"` + filter string `msg:"flt"` status scanStatus `msg:"stat"` fileNotFound bool `msg:"fnf"` error string `msg:"err"` @@ -114,16 +121,16 @@ func (m *metacache) canBeReplacedBy(other *metacache) bool { switch { case !m.recursive && !other.recursive: // If both not recursive root must match. - return m.root == other.root + return m.root == other.root && strings.HasPrefix(m.filter, other.filter) case m.recursive && !other.recursive: // A recursive can never be replaced by a non-recursive return false case !m.recursive && other.recursive: // If other is recursive it must contain this root - return strings.HasPrefix(m.root, other.root) + return strings.HasPrefix(m.root, other.root) && other.filter == "" case m.recursive && other.recursive: // Similar if both are recursive - return strings.HasPrefix(m.root, other.root) + return strings.HasPrefix(m.root, other.root) && other.filter == "" } panic("should be unreachable") } diff --git a/cmd/metacache_gen.go b/cmd/metacache_gen.go index c5d8afda7..ef2577490 100644 --- a/cmd/metacache_gen.go +++ b/cmd/metacache_gen.go @@ -48,6 +48,12 @@ func (z *metacache) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "recursive") return } + case "flt": + z.filter, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "filter") + return + } case "stat": { var zb0002 uint8 @@ -125,9 +131,9 @@ func (z *metacache) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 14 + // map header, size 15 // write "id" - err = en.Append(0x8e, 0xa2, 0x69, 0x64) + err = en.Append(0x8f, 0xa2, 0x69, 0x64) if err != nil { return } @@ -166,6 +172,16 @@ func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "recursive") return } + // write "flt" + err = en.Append(0xa3, 0x66, 0x6c, 0x74) + if err != nil { + return + } + err = en.WriteString(z.filter) + if err != nil { + err = msgp.WrapError(err, "filter") + return + } // write "stat" err = en.Append(0xa4, 0x73, 0x74, 0x61, 0x74) if err != nil { @@ -272,9 +288,9 @@ func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *metacache) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 14 + // map header, size 15 // string "id" - o = append(o, 0x8e, 0xa2, 0x69, 0x64) + o = append(o, 0x8f, 0xa2, 0x69, 0x64) o = msgp.AppendString(o, z.id) // string "b" o = append(o, 0xa1, 0x62) @@ -285,6 +301,9 @@ func (z *metacache) MarshalMsg(b []byte) (o []byte, err error) { // string "rec" o = append(o, 0xa3, 0x72, 0x65, 0x63) o = msgp.AppendBool(o, z.recursive) + // string "flt" + o = append(o, 0xa3, 0x66, 0x6c, 0x74) + o = msgp.AppendString(o, z.filter) // string "stat" o = append(o, 0xa4, 0x73, 0x74, 0x61, 0x74) o = msgp.AppendUint8(o, uint8(z.status)) @@ -360,6 +379,12 @@ func (z *metacache) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "recursive") return } + case "flt": + z.filter, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "filter") + return + } case "stat": { var zb0002 uint8 @@ -438,7 +463,7 @@ func (z *metacache) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *metacache) Msgsize() (s int) { - s = 1 + 3 + msgp.StringPrefixSize + len(z.id) + 2 + msgp.StringPrefixSize + len(z.bucket) + 5 + msgp.StringPrefixSize + len(z.root) + 4 + msgp.BoolSize + 5 + msgp.Uint8Size + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.error) + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 2 + msgp.TimeSize + 3 + msgp.TimeSize + 4 + msgp.Uint64Size + 5 + msgp.Uint64Size + 2 + msgp.Uint8Size + s = 1 + 3 + msgp.StringPrefixSize + len(z.id) + 2 + msgp.StringPrefixSize + len(z.bucket) + 5 + msgp.StringPrefixSize + len(z.root) + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.filter) + 5 + msgp.Uint8Size + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.error) + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 2 + msgp.TimeSize + 3 + msgp.TimeSize + 4 + msgp.Uint64Size + 5 + msgp.Uint64Size + 2 + msgp.Uint8Size return } diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 289c13f20..8f5a5a276 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -70,6 +70,7 @@ const ( storageRESTLength = "length" storageRESTCount = "count" storageRESTMarkerPath = "marker" + storageRESTPrefixFilter = "prefix" storageRESTRecursive = "recursive" storageRESTBitrotAlgo = "bitrot-algo" storageRESTBitrotHash = "bitrot-hash" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index d99ae555e..c76292441 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -1083,7 +1083,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerSets Endpoint subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(httpTraceHdrs(server.VerifyFileHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler)). - Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTRecursive)...) + Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTRecursive, storageRESTPrefixFilter)...) } } }