metacache: Allow prefix filtering (#10920)

Do listings with prefix filter when bloom filter is dirty.

This will forward the prefix filter to the lister which will make it 
only scan the folders/objects with the specified prefix.

If we have a clean bloom filter we try to build a more generally 
useful cache so in that case, we will list all objects/folders.
master
Klaus Post 4 years ago committed by GitHub
parent e413f05397
commit 990d074f7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      cmd/metacache-bucket.go
  2. 2
      cmd/metacache-server-sets.go
  3. 34
      cmd/metacache-set.go
  4. 18
      cmd/metacache-walk.go
  5. 13
      cmd/metacache.go
  6. 35
      cmd/metacache_gen.go
  7. 1
      cmd/storage-rest-common.go
  8. 2
      cmd/storage-rest-server.go

@ -228,6 +228,10 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache {
debugPrint("cache %s prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir) debugPrint("cache %s prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir)
continue continue
} }
if cached.filter != "" && strings.HasPrefix(cached.filter, o.FilterPrefix) {
debugPrint("cache %s cannot be used because of filter %s", cached.id, cached.filter)
continue
}
// If the existing listing wasn't recursive root must match. // If the existing listing wasn't recursive root must match.
if !cached.recursive && o.BaseDir != cached.root { if !cached.recursive && o.BaseDir != cached.root {
debugPrint("cache %s non rec prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir) debugPrint("cache %s non rec prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir)

@ -100,6 +100,8 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en
rpc = nil rpc = nil
o.Transient = true o.Transient = true
} }
// Apply prefix filter if enabled.
o.SetFilter()
if rpc == nil || o.Transient { if rpc == nil || o.Transient {
// Local // Local
cache = localMetacacheMgr.findCache(ctx, o) cache = localMetacacheMgr.findCache(ctx, o)

@ -47,6 +47,11 @@ type listPathOptions struct {
// Scan/return only content with prefix. // Scan/return only content with prefix.
Prefix string Prefix string
// FilterPrefix will return only results with this prefix when scanning.
// Should never contain a slash.
// Prefix should still be set.
FilterPrefix string
// Marker to resume listing. // Marker to resume listing.
// The response will be the first entry AFTER this object name. // The response will be the first entry AFTER this object name.
Marker string Marker string
@ -112,6 +117,7 @@ func (o listPathOptions) newMetacache() metacache {
startedCycle: o.CurrentCycle, startedCycle: o.CurrentCycle,
endedCycle: 0, endedCycle: 0,
dataVersion: metacacheStreamVersion, dataVersion: metacacheStreamVersion,
filter: o.FilterPrefix,
} }
} }
@ -279,6 +285,28 @@ func (o *listPathOptions) objectPath(block int) string {
return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block-"+strconv.Itoa(block)+".s2") return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block-"+strconv.Itoa(block)+".s2")
} }
func (o *listPathOptions) SetFilter() {
switch {
case metacacheSharePrefix:
return
case o.CurrentCycle != o.OldestCycle:
// We have a clean bloom filter
return
case o.Prefix == o.BaseDir:
// No additional prefix
return
}
// Remove basedir.
o.FilterPrefix = strings.TrimPrefix(o.Prefix, o.BaseDir)
// Remove leading and trailing slashes.
o.FilterPrefix = strings.Trim(o.FilterPrefix, slashSeparator)
if strings.Contains(o.FilterPrefix, slashSeparator) {
// Sanity check, should not happen.
o.FilterPrefix = ""
}
}
// filter will apply the options and return the number of objects requested by the limit. // filter will apply the options and return the number of objects requested by the limit.
// Will return io.EOF if there are no more entries with the same filter. // Will return io.EOF if there are no more entries with the same filter.
// The last entry can be used as a marker to resume the listing. // The last entry can be used as a marker to resume the listing.
@ -604,7 +632,11 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
} }
// Send request to each disk. // Send request to each disk.
go func() { go func() {
err := d.WalkDir(ctx, WalkDirOptions{Bucket: o.Bucket, BaseDir: o.BaseDir, Recursive: o.Recursive || o.Separator != SlashSeparator}, w) err := d.WalkDir(ctx, WalkDirOptions{
Bucket: o.Bucket,
BaseDir: o.BaseDir,
Recursive: o.Recursive || o.Separator != SlashSeparator,
FilterPrefix: o.FilterPrefix}, w)
w.CloseWithError(err) w.CloseWithError(err)
if err != io.EOF { if err != io.EOF {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)

@ -42,6 +42,10 @@ type WalkDirOptions struct {
// Do a full recursive scan. // Do a full recursive scan.
Recursive bool Recursive bool
// FilterPrefix will only return results with given prefix within folder.
// Should never contain a slash.
FilterPrefix string
} }
// WalkDir will traverse a directory and return all entries found. // WalkDir will traverse a directory and return all entries found.
@ -85,6 +89,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
defer close(out) defer close(out)
prefix := opts.FilterPrefix
var scanDir func(path string) error var scanDir func(path string) error
scanDir = func(current string) error { scanDir = func(current string) error {
entries, err := s.ListDir(ctx, opts.Bucket, current, -1) entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
@ -98,6 +103,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
dirObjects := make(map[string]struct{}) dirObjects := make(map[string]struct{})
for i, entry := range entries { for i, entry := range entries {
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
continue
}
if strings.HasSuffix(entry, slashSeparator) { if strings.HasSuffix(entry, slashSeparator) {
if strings.HasSuffix(entry, globalDirSuffixWithSlash) { if strings.HasSuffix(entry, globalDirSuffixWithSlash) {
// Add without extension so it is sorted correctly. // Add without extension so it is sorted correctly.
@ -148,6 +156,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Process in sort order. // Process in sort order.
sort.Strings(entries) sort.Strings(entries)
dirStack := make([]string, 0, 5) dirStack := make([]string, 0, 5)
prefix = "" // Remove prefix after first level.
for _, entry := range entries { for _, entry := range entries {
if entry == "" { if entry == "" {
continue continue
@ -232,6 +241,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption
values.Set(storageRESTVolume, opts.Bucket) values.Set(storageRESTVolume, opts.Bucket)
values.Set(storageRESTDirPath, opts.BaseDir) values.Set(storageRESTDirPath, opts.BaseDir)
values.Set(storageRESTRecursive, strconv.FormatBool(opts.Recursive)) values.Set(storageRESTRecursive, strconv.FormatBool(opts.Recursive))
values.Set(storageRESTPrefixFilter, opts.FilterPrefix)
respBody, err := client.call(ctx, storageRESTMethodWalkDir, values, nil, -1) respBody, err := client.call(ctx, storageRESTMethodWalkDir, values, nil, -1)
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
@ -248,11 +258,17 @@ func (s *storageRESTServer) WalkDirHandler(w http.ResponseWriter, r *http.Reques
vars := mux.Vars(r) vars := mux.Vars(r)
volume := vars[storageRESTVolume] volume := vars[storageRESTVolume]
dirPath := vars[storageRESTDirPath] dirPath := vars[storageRESTDirPath]
prefix := vars[storageRESTPrefixFilter]
recursive, err := strconv.ParseBool(vars[storageRESTRecursive]) recursive, err := strconv.ParseBool(vars[storageRESTRecursive])
if err != nil { if err != nil {
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
writer := streamHTTPResponse(w) writer := streamHTTPResponse(w)
writer.CloseWithError(s.storage.WalkDir(r.Context(), WalkDirOptions{Bucket: volume, BaseDir: dirPath, Recursive: recursive}, writer)) writer.CloseWithError(s.storage.WalkDir(r.Context(), WalkDirOptions{
Bucket: volume,
BaseDir: dirPath,
Recursive: recursive,
FilterPrefix: prefix,
}, writer))
} }

@ -40,6 +40,12 @@ const (
// metacacheBlockSize is the number of file/directory entries to have in each block. // metacacheBlockSize is the number of file/directory entries to have in each block.
metacacheBlockSize = 5000 metacacheBlockSize = 5000
// metacacheSharePrefix controls whether prefixes on dirty paths are always shared.
// This will make `test/a` and `test/b` share listings if they are concurrent.
// Enabling this will make cache sharing more likely and cause less IO,
// but may cause additional latency to some calls.
metacacheSharePrefix = false
) )
//go:generate msgp -file $GOFILE -unexported //go:generate msgp -file $GOFILE -unexported
@ -50,6 +56,7 @@ type metacache struct {
bucket string `msg:"b"` bucket string `msg:"b"`
root string `msg:"root"` root string `msg:"root"`
recursive bool `msg:"rec"` recursive bool `msg:"rec"`
filter string `msg:"flt"`
status scanStatus `msg:"stat"` status scanStatus `msg:"stat"`
fileNotFound bool `msg:"fnf"` fileNotFound bool `msg:"fnf"`
error string `msg:"err"` error string `msg:"err"`
@ -114,16 +121,16 @@ func (m *metacache) canBeReplacedBy(other *metacache) bool {
switch { switch {
case !m.recursive && !other.recursive: case !m.recursive && !other.recursive:
// If both not recursive root must match. // If both not recursive root must match.
return m.root == other.root return m.root == other.root && strings.HasPrefix(m.filter, other.filter)
case m.recursive && !other.recursive: case m.recursive && !other.recursive:
// A recursive can never be replaced by a non-recursive // A recursive can never be replaced by a non-recursive
return false return false
case !m.recursive && other.recursive: case !m.recursive && other.recursive:
// If other is recursive it must contain this root // If other is recursive it must contain this root
return strings.HasPrefix(m.root, other.root) return strings.HasPrefix(m.root, other.root) && other.filter == ""
case m.recursive && other.recursive: case m.recursive && other.recursive:
// Similar if both are recursive // Similar if both are recursive
return strings.HasPrefix(m.root, other.root) return strings.HasPrefix(m.root, other.root) && other.filter == ""
} }
panic("should be unreachable") panic("should be unreachable")
} }

@ -48,6 +48,12 @@ func (z *metacache) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "recursive") err = msgp.WrapError(err, "recursive")
return return
} }
case "flt":
z.filter, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "filter")
return
}
case "stat": case "stat":
{ {
var zb0002 uint8 var zb0002 uint8
@ -125,9 +131,9 @@ func (z *metacache) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable // EncodeMsg implements msgp.Encodable
func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) { func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 14 // map header, size 15
// write "id" // write "id"
err = en.Append(0x8e, 0xa2, 0x69, 0x64) err = en.Append(0x8f, 0xa2, 0x69, 0x64)
if err != nil { if err != nil {
return return
} }
@ -166,6 +172,16 @@ func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "recursive") err = msgp.WrapError(err, "recursive")
return return
} }
// write "flt"
err = en.Append(0xa3, 0x66, 0x6c, 0x74)
if err != nil {
return
}
err = en.WriteString(z.filter)
if err != nil {
err = msgp.WrapError(err, "filter")
return
}
// write "stat" // write "stat"
err = en.Append(0xa4, 0x73, 0x74, 0x61, 0x74) err = en.Append(0xa4, 0x73, 0x74, 0x61, 0x74)
if err != nil { if err != nil {
@ -272,9 +288,9 @@ func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z *metacache) MarshalMsg(b []byte) (o []byte, err error) { func (z *metacache) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// map header, size 14 // map header, size 15
// string "id" // string "id"
o = append(o, 0x8e, 0xa2, 0x69, 0x64) o = append(o, 0x8f, 0xa2, 0x69, 0x64)
o = msgp.AppendString(o, z.id) o = msgp.AppendString(o, z.id)
// string "b" // string "b"
o = append(o, 0xa1, 0x62) o = append(o, 0xa1, 0x62)
@ -285,6 +301,9 @@ func (z *metacache) MarshalMsg(b []byte) (o []byte, err error) {
// string "rec" // string "rec"
o = append(o, 0xa3, 0x72, 0x65, 0x63) o = append(o, 0xa3, 0x72, 0x65, 0x63)
o = msgp.AppendBool(o, z.recursive) o = msgp.AppendBool(o, z.recursive)
// string "flt"
o = append(o, 0xa3, 0x66, 0x6c, 0x74)
o = msgp.AppendString(o, z.filter)
// string "stat" // string "stat"
o = append(o, 0xa4, 0x73, 0x74, 0x61, 0x74) o = append(o, 0xa4, 0x73, 0x74, 0x61, 0x74)
o = msgp.AppendUint8(o, uint8(z.status)) o = msgp.AppendUint8(o, uint8(z.status))
@ -360,6 +379,12 @@ func (z *metacache) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "recursive") err = msgp.WrapError(err, "recursive")
return return
} }
case "flt":
z.filter, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "filter")
return
}
case "stat": case "stat":
{ {
var zb0002 uint8 var zb0002 uint8
@ -438,7 +463,7 @@ func (z *metacache) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *metacache) Msgsize() (s int) { func (z *metacache) Msgsize() (s int) {
s = 1 + 3 + msgp.StringPrefixSize + len(z.id) + 2 + msgp.StringPrefixSize + len(z.bucket) + 5 + msgp.StringPrefixSize + len(z.root) + 4 + msgp.BoolSize + 5 + msgp.Uint8Size + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.error) + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 2 + msgp.TimeSize + 3 + msgp.TimeSize + 4 + msgp.Uint64Size + 5 + msgp.Uint64Size + 2 + msgp.Uint8Size s = 1 + 3 + msgp.StringPrefixSize + len(z.id) + 2 + msgp.StringPrefixSize + len(z.bucket) + 5 + msgp.StringPrefixSize + len(z.root) + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.filter) + 5 + msgp.Uint8Size + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.error) + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 2 + msgp.TimeSize + 3 + msgp.TimeSize + 4 + msgp.Uint64Size + 5 + msgp.Uint64Size + 2 + msgp.Uint8Size
return return
} }

@ -70,6 +70,7 @@ const (
storageRESTLength = "length" storageRESTLength = "length"
storageRESTCount = "count" storageRESTCount = "count"
storageRESTMarkerPath = "marker" storageRESTMarkerPath = "marker"
storageRESTPrefixFilter = "prefix"
storageRESTRecursive = "recursive" storageRESTRecursive = "recursive"
storageRESTBitrotAlgo = "bitrot-algo" storageRESTBitrotAlgo = "bitrot-algo"
storageRESTBitrotHash = "bitrot-hash" storageRESTBitrotHash = "bitrot-hash"

@ -1083,7 +1083,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerSets Endpoint
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(httpTraceHdrs(server.VerifyFileHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(httpTraceHdrs(server.VerifyFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTRecursive)...) Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTRecursive, storageRESTPrefixFilter)...)
} }
} }
} }

Loading…
Cancel
Save