From 073aac3d92c8986e353b60ed3214a9c47c123018 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 27 Apr 2020 19:06:21 +0200 Subject: [PATCH] add data update tracking using bloom filter (#9208) By monitoring PUT/DELETE and heal operations it is possible to track changed paths and keep a bloom filter for this data. This can help prioritize paths to scan. The bloom filter can identify paths that have not changed, and the few collisions will only result in a marginal extra workload. This can be implemented on either a bucket+(1 prefix level) with reasonable performance. The bloom filter is set to have a false positive rate at 1% at 1M entries. A bloom table of this size is about ~2500 bytes when serialized. To not force a full scan of all paths that have changed cycle bloom filters would need to be kept, so we guarantee that dirty paths have been scanned within cycle runs. Until cycle bloom filters have been collected all paths are considered dirty. --- cmd/background-heal-ops.go | 2 + cmd/data-update-tracker.go | 607 ++++++++++++++++++++++++++++++++ cmd/data-update-tracker_test.go | 262 ++++++++++++++ cmd/data-usage-cache.go | 47 ++- cmd/data-usage-cache_gen.go | 78 +++- cmd/data-usage.go | 103 +++++- cmd/erasure-encode_test.go | 4 + cmd/fs-v1-multipart.go | 1 + cmd/fs-v1.go | 28 +- cmd/gateway-unsupported.go | 2 +- cmd/notification.go | 75 +++- cmd/object-api-interface.go | 2 +- cmd/peer-rest-client.go | 19 + cmd/peer-rest-common.go | 1 + cmd/peer-rest-server.go | 25 ++ cmd/server-main.go | 1 - cmd/xl-sets.go | 3 +- cmd/xl-v1-healing.go | 3 + cmd/xl-v1-multipart.go | 2 + cmd/xl-v1-object.go | 6 + cmd/xl-v1.go | 37 +- cmd/xl-zones.go | 14 +- go.mod | 3 + go.sum | 6 + 24 files changed, 1270 insertions(+), 61 deletions(-) create mode 100644 cmd/data-update-tracker.go create mode 100644 cmd/data-update-tracker_test.go diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index a3650720b..1659d5939 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "path" "time" "github.com/minio/minio/cmd/logger" @@ -88,6 +89,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { case bucket != "" && object != "": res, err = objAPI.HealObject(ctx, bucket, object, task.opts) } + ObjectPathUpdated(path.Join(bucket, object)) if task.responseCh != nil { task.responseCh <- healResult{result: res, err: err} } diff --git a/cmd/data-update-tracker.go b/cmd/data-update-tracker.go new file mode 100644 index 000000000..4aec4381a --- /dev/null +++ b/cmd/data-update-tracker.go @@ -0,0 +1,607 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bufio" + "bytes" + "context" + "encoding/binary" + "errors" + "io" + "io/ioutil" + "os" + "path" + "sort" + "strings" + "sync" + "time" + + "github.com/minio/minio/cmd/config" + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/color" + "github.com/minio/minio/pkg/env" + "github.com/willf/bloom" +) + +const ( + // Estimate bloom filter size. With this many items + dataUpdateTrackerEstItems = 1000000 + // ... we want this false positive rate: + dataUpdateTrackerFP = 0.99 + dataUpdateTrackerQueueSize = 10000 + + dataUpdateTrackerVersion = 1 + dataUpdateTrackerFilename = minioMetaBucket + SlashSeparator + bucketMetaPrefix + SlashSeparator + ".tracker.bin" + dataUpdateTrackerSaveInterval = 5 * time.Minute + + // Reset bloom filters every n cycle + dataUpdateTrackerResetEvery = 1000 +) + +var ( + objectUpdatedCh chan<- string + intDataUpdateTracker *dataUpdateTracker +) + +func init() { + intDataUpdateTracker = newDataUpdateTracker() + objectUpdatedCh = intDataUpdateTracker.input +} + +type dataUpdateTracker struct { + mu sync.Mutex + input chan string + save chan struct{} + debug bool + saveExited chan struct{} + + Current dataUpdateFilter + History dataUpdateTrackerHistory + Saved time.Time +} + +// newDataUpdateTracker returns a dataUpdateTracker with default settings. +func newDataUpdateTracker() *dataUpdateTracker { + d := &dataUpdateTracker{ + Current: dataUpdateFilter{ + idx: 1, + }, + debug: env.Get(envDataUsageCrawlDebug, config.EnableOff) == config.EnableOn, + input: make(chan string, dataUpdateTrackerQueueSize), + save: make(chan struct{}, 1), + saveExited: make(chan struct{}), + } + d.Current.bf = d.newBloomFilter() + return d +} + +type dataUpdateTrackerHistory []dataUpdateFilter + +type dataUpdateFilter struct { + idx uint64 + bf bloomFilter +} + +type bloomFilter struct { + *bloom.BloomFilter +} + +// emptyBloomFilter returns an empty bloom filter. +func emptyBloomFilter() bloomFilter { + return bloomFilter{BloomFilter: &bloom.BloomFilter{}} +} + +// containsDir returns whether the bloom filter contains a directory. +// Note that objects in XL mode are also considered directories. +func (b bloomFilter) containsDir(in string) bool { + split := splitPathDeterministic(path.Clean(in)) + + if len(split) == 0 { + return false + } + var tmp [dataUsageHashLen]byte + hashPath(path.Join(split...)).bytes(tmp[:]) + return b.Test(tmp[:]) +} + +// bytes returns the bloom filter serialized as a byte slice. +func (b bloomFilter) bytes() []byte { + if b.BloomFilter == nil { + return nil + } + var buf bytes.Buffer + _, err := b.WriteTo(&buf) + if err != nil { + logger.LogIf(GlobalContext, err) + return nil + } + return buf.Bytes() +} + +// sort the dataUpdateTrackerHistory, newest first. +// Returns whether the history is complete. +func (d dataUpdateTrackerHistory) sort() bool { + if len(d) == 0 { + return true + } + sort.Slice(d, func(i, j int) bool { + return d[i].idx > d[j].idx + }) + return d[0].idx-d[len(d)-1].idx == uint64(len(d)) +} + +// removeOlderThan will remove entries older than index 'n'. +func (d *dataUpdateTrackerHistory) removeOlderThan(n uint64) { + d.sort() + dd := *d + end := len(dd) + for i := end - 1; i >= 0; i-- { + if dd[i].idx < n { + end = i + } + } + dd = dd[:end] + *d = dd +} + +// newBloomFilter returns a new bloom filter with default settings. +func (d *dataUpdateTracker) newBloomFilter() bloomFilter { + return bloomFilter{bloom.NewWithEstimates(dataUpdateTrackerEstItems, dataUpdateTrackerFP)} +} + +// current returns the current index. +func (d *dataUpdateTracker) current() uint64 { + d.mu.Lock() + defer d.mu.Unlock() + return d.Current.idx +} + +// start will load the current data from the drives start collecting information and +// start a saver goroutine. +// All of these will exit when the context is canceled. +func (d *dataUpdateTracker) start(ctx context.Context, drives ...string) { + if len(drives) <= 0 { + logger.LogIf(ctx, errors.New("dataUpdateTracker.start: No drives specified")) + return + } + d.load(ctx, drives...) + go d.startCollector(ctx) + go d.startSaver(ctx, dataUpdateTrackerSaveInterval, drives) +} + +// load will attempt to load data tracking information from the supplied drives. +// The data will only be loaded if d.Saved is older than the one found on disk. +// The newest working cache will be kept in d. +// If no valid data usage tracker can be found d will remain unchanged. +// If object is shared the caller should lock it. +func (d *dataUpdateTracker) load(ctx context.Context, drives ...string) { + if len(drives) <= 0 { + logger.LogIf(ctx, errors.New("dataUpdateTracker.load: No drives specified")) + return + } + for _, drive := range drives { + cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename) + f, err := os.Open(cacheFormatPath) + if err != nil { + if os.IsNotExist(err) { + continue + } + logger.LogIf(ctx, err) + continue + } + err = d.deserialize(f, d.Saved) + if err != nil { + logger.LogIf(ctx, err) + } + f.Close() + } +} + +// startSaver will start a saver that will write d to all supplied drives at specific intervals. +// The saver will save and exit when supplied context is closed. +func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Duration, drives []string) { + t := time.NewTicker(interval) + defer t.Stop() + var buf bytes.Buffer + d.mu.Lock() + saveNow := d.save + exited := make(chan struct{}) + d.saveExited = exited + d.mu.Unlock() + defer close(exited) + for { + var exit bool + select { + case <-ctx.Done(): + exit = true + case <-t.C: + case <-saveNow: + } + buf.Reset() + d.mu.Lock() + d.Saved = UTCNow() + err := d.serialize(&buf) + if d.debug { + logger.Info(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v", buf.Len(), d.Current.idx) + } + d.mu.Unlock() + if err != nil { + logger.LogIf(ctx, err, "Error serializing usage tracker data") + if exit { + return + } + continue + } + if buf.Len() == 0 { + logger.LogIf(ctx, errors.New("zero sized output, skipping save")) + continue + } + for _, drive := range drives { + cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename) + err := ioutil.WriteFile(cacheFormatPath, buf.Bytes(), os.ModePerm) + if err != nil { + logger.LogIf(ctx, err) + continue + } + } + if exit { + return + } + } +} + +// serialize all data in d to dst. +// Caller should hold lock if d is expected to be shared. +// If an error is returned, there will likely be partial data written to dst. +func (d *dataUpdateTracker) serialize(dst io.Writer) (err error) { + ctx := GlobalContext + var tmp [8]byte + o := bufio.NewWriter(dst) + defer func() { + if err == nil { + err = o.Flush() + } + }() + + // Version + if err := o.WriteByte(dataUpdateTrackerVersion); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + // Timestamp. + binary.LittleEndian.PutUint64(tmp[:], uint64(d.Saved.Unix())) + if _, err := o.Write(tmp[:]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + // Current + binary.LittleEndian.PutUint64(tmp[:], d.Current.idx) + if _, err := o.Write(tmp[:]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + if _, err := d.Current.bf.WriteTo(o); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + // History + binary.LittleEndian.PutUint64(tmp[:], uint64(len(d.History))) + if _, err := o.Write(tmp[:]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + for _, bf := range d.History { + // Current + binary.LittleEndian.PutUint64(tmp[:], bf.idx) + if _, err := o.Write(tmp[:]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + if _, err := bf.bf.WriteTo(o); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + } + return nil +} + +// deserialize will deserialize the supplied input if the input is newer than the supplied time. +func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) error { + ctx := GlobalContext + var dst dataUpdateTracker + var tmp [8]byte + + // Version + if _, err := io.ReadFull(src, tmp[:1]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + switch tmp[0] { + case dataUpdateTrackerVersion: + default: + return errors.New("dataUpdateTracker: Unknown data version") + } + // Timestamp. + if _, err := io.ReadFull(src, tmp[:8]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + t := time.Unix(int64(binary.LittleEndian.Uint64(tmp[:])), 0) + if !t.After(newerThan) { + return nil + } + + // Current + if _, err := io.ReadFull(src, tmp[:8]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + dst.Current.idx = binary.LittleEndian.Uint64(tmp[:]) + dst.Current.bf = emptyBloomFilter() + if _, err := dst.Current.bf.ReadFrom(src); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + // History + if _, err := io.ReadFull(src, tmp[:8]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + n := binary.LittleEndian.Uint64(tmp[:]) + dst.History = make(dataUpdateTrackerHistory, int(n)) + for i, e := range dst.History { + if _, err := io.ReadFull(src, tmp[:8]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + e.idx = binary.LittleEndian.Uint64(tmp[:]) + e.bf = emptyBloomFilter() + if _, err := e.bf.ReadFrom(src); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + dst.History[i] = e + } + // Ignore what remains on the stream. + // Update d: + d.Current = dst.Current + d.History = dst.History + d.Saved = dst.Saved + return nil +} + +// start a collector that picks up entries from objectUpdatedCh +// and adds them to the current bloom filter. +func (d *dataUpdateTracker) startCollector(ctx context.Context) { + var tmp [dataUsageHashLen]byte + for { + select { + case <-ctx.Done(): + return + case in := <-d.input: + bucket, _ := path2BucketObjectWithBasePath("", in) + if bucket == "" { + if d.debug && len(in) > 0 { + logger.Info(color.Green("data-usage:")+" no bucket (%s)", in) + } + continue + } + + if isReservedOrInvalidBucket(bucket, false) { + if false && d.debug { + logger.Info(color.Green("data-usage:")+" isReservedOrInvalidBucket: %v, entry: %v", bucket, in) + } + continue + } + split := splitPathDeterministic(in) + + // Add all paths until level 3. + d.mu.Lock() + for i := range split { + if d.debug && false { + logger.Info(color.Green("dataUpdateTracker:") + " Marking path dirty: " + color.Blue(path.Join(split[:i+1]...))) + } + hashPath(path.Join(split[:i+1]...)).bytes(tmp[:]) + d.Current.bf.Add(tmp[:]) + } + d.mu.Unlock() + } + } +} + +// find entry with specified index. +// Returns nil if not found. +func (d dataUpdateTrackerHistory) find(idx uint64) *dataUpdateFilter { + for _, f := range d { + if f.idx == idx { + return &f + } + } + return nil +} + +// filterFrom will return a combined bloom filter. +func (d *dataUpdateTracker) filterFrom(ctx context.Context, oldest, newest uint64) *bloomFilterResponse { + bf := d.newBloomFilter() + bfr := bloomFilterResponse{ + OldestIdx: oldest, + CurrentIdx: d.Current.idx, + Complete: true, + } + // Loop through each index requested. + for idx := oldest; idx <= newest; idx++ { + v := d.History.find(idx) + if v == nil { + if d.Current.idx == idx { + // Merge current. + err := bf.Merge(d.Current.bf.BloomFilter) + logger.LogIf(ctx, err) + if err != nil { + bfr.Complete = false + } + continue + } + bfr.Complete = false + bfr.OldestIdx = idx + 1 + continue + } + + err := bf.Merge(v.bf.BloomFilter) + if err != nil { + bfr.Complete = false + logger.LogIf(ctx, err) + continue + } + bfr.NewestIdx = idx + } + var dst bytes.Buffer + _, err := bf.WriteTo(&dst) + if err != nil { + logger.LogIf(ctx, err) + return nil + } + bfr.Filter = dst.Bytes() + + return &bfr +} + +// cycleFilter will cycle the bloom filter to start recording to index y if not already. +// The response will contain a bloom filter starting at index x up to, but not including index y. +// If y is 0, the response will not update y, but return the currently recorded information +// from the up until and including current y. +func (d *dataUpdateTracker) cycleFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) { + d.mu.Lock() + defer d.mu.Unlock() + + if current == 0 { + if len(d.History) == 0 { + return d.filterFrom(ctx, d.Current.idx, d.Current.idx), nil + } + d.History.sort() + return d.filterFrom(ctx, d.History[len(d.History)-1].idx, d.Current.idx), nil + } + + // Move current to history if new one requested + if d.Current.idx != current { + if d.debug { + logger.Info(color.Green("dataUpdateTracker:")+" cycle bloom filter: %v -> %v", d.Current.idx, current) + } + + d.History = append(d.History, d.Current) + d.Current.idx = current + d.Current.bf = d.newBloomFilter() + select { + case d.save <- struct{}{}: + default: + } + } + d.History.removeOlderThan(oldest) + return d.filterFrom(ctx, oldest, current), nil +} + +// splitPathDeterministic will split the provided relative path +// deterministically and return up to the first 3 elements of the path. +// Slash and dot prefixes are removed. +// Trailing slashes are removed. +// Returns 0 length if no parts are found after trimming. +func splitPathDeterministic(in string) []string { + split := strings.Split(in, SlashSeparator) + + // Trim empty start/end + for len(split) > 0 { + if len(split[0]) > 0 && split[0] != "." { + break + } + split = split[1:] + } + for len(split) > 0 { + if len(split[len(split)-1]) > 0 { + break + } + split = split[:len(split)-1] + } + + // Return up to 3 parts. + if len(split) > 3 { + split = split[:3] + } + return split +} + +// bloomFilterRequest request bloom filters. +// Current index will be updated to current and entries back to Oldest is returned. +type bloomFilterRequest struct { + Oldest uint64 + Current uint64 +} + +type bloomFilterResponse struct { + // Current index being written to. + CurrentIdx uint64 + // Oldest index in the returned bloom filter. + OldestIdx uint64 + // Newest Index in the returned bloom filter. + NewestIdx uint64 + // Are all indexes between oldest and newest filled? + Complete bool + // Binary data of the bloom filter. + Filter []byte +} + +// ObjectPathUpdated indicates a path has been updated. +// The function will never block. +func ObjectPathUpdated(s string) { + select { + case objectUpdatedCh <- s: + default: + } +} diff --git a/cmd/data-update-tracker_test.go b/cmd/data-update-tracker_test.go new file mode 100644 index 000000000..9af0c9d11 --- /dev/null +++ b/cmd/data-update-tracker_test.go @@ -0,0 +1,262 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "math/rand" + "os" + "path" + "path/filepath" + "sync" + "testing" + + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/cmd/logger/message/log" +) + +type testLoggerI interface { + Helper() + Log(args ...interface{}) +} + +type testingLogger struct { + mu sync.Mutex + t testLoggerI +} + +func (t *testingLogger) Send(entry interface{}, errKind string) error { + t.mu.Lock() + defer t.mu.Unlock() + if t.t == nil { + return nil + } + e, ok := entry.(log.Entry) + if !ok { + return fmt.Errorf("unexpected log entry structure %#v", entry) + } + + t.t.Helper() + t.t.Log(e.Level, ":", errKind, e.Message) + return nil +} + +func addTestingLogging(t testLoggerI) func() { + tl := &testingLogger{t: t} + logger.AddTarget(tl) + return func() { + tl.mu.Lock() + defer tl.mu.Unlock() + tl.t = nil + } +} + +func TestDataUpdateTracker(t *testing.T) { + dut := newDataUpdateTracker() + // Change some defaults. + dut.debug = testing.Verbose() + dut.input = make(chan string) + dut.save = make(chan struct{}) + + defer addTestingLogging(t)() + + dut.Current.bf = dut.newBloomFilter() + + tmpDir, err := ioutil.TempDir("", "TestDataUpdateTracker") + if err != nil { + t.Fatal(err) + } + err = os.MkdirAll(filepath.Dir(filepath.Join(tmpDir, dataUpdateTrackerFilename)), os.ModePerm) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dut.start(ctx, tmpDir) + + var tests = []struct { + in string + check []string // if not empty, check against these instead. + exist bool + }{ + { + in: "bucket/directory/file.txt", + check: []string{"bucket", "bucket/", "/bucket", "bucket/directory", "bucket/directory/", "bucket/directory/file.txt", "/bucket/directory/file.txt"}, + exist: true, + }, + { + // System bucket + in: ".minio.sys/ignoreme/pls", + exist: false, + }, + { + // Not a valid bucket + in: "./bucket/okfile.txt", + check: []string{"./bucket/okfile.txt", "/bucket/okfile.txt", "bucket/okfile.txt"}, + exist: false, + }, + { + // Not a valid bucket + in: "æ/okfile.txt", + check: []string{"æ/okfile.txt", "æ/okfile.txt", "æ"}, + exist: false, + }, + { + in: "/bucket2/okfile2.txt", + check: []string{"./bucket2/okfile2.txt", "/bucket2/okfile2.txt", "bucket2/okfile2.txt", "bucket2"}, + exist: true, + }, + { + in: "/bucket3/prefix/okfile2.txt", + check: []string{"./bucket3/prefix/okfile2.txt", "/bucket3/prefix/okfile2.txt", "bucket3/prefix/okfile2.txt", "bucket3/prefix", "bucket3"}, + exist: true, + }, + } + for _, tt := range tests { + t.Run(tt.in, func(t *testing.T) { + dut.input <- tt.in + dut.input <- "" // Sending empty string ensures the previous is added to filter. + dut.mu.Lock() + defer dut.mu.Unlock() + if len(tt.check) == 0 { + got := dut.Current.bf.containsDir(tt.in) + if got != tt.exist { + // For unlimited tests this could lead to false positives, + // but it should be deterministic. + t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist) + } + return + } + for _, check := range tt.check { + got := dut.Current.bf.containsDir(check) + if got != tt.exist { + // For unlimited tests this could lead to false positives, + // but it should be deterministic. + t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist) + } + continue + } + }) + } + // Cycle to history + _, err = dut.cycleFilter(ctx, 1, 2) + if err != nil { + t.Fatal(err) + } + dut.input <- "cycle2/file.txt" + dut.input <- "" // Sending empty string ensures the previous is added to filter. + + tests = append(tests, struct { + in string + check []string + exist bool + }{in: "cycle2/file.txt", exist: true}) + + // Shut down + cancel() + <-dut.saveExited + + if dut.current() != 2 { + t.Fatal("wrong current idx after save. want 2, got:", dut.current()) + } + + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + // Reload... + dut = newDataUpdateTracker() + dut.start(ctx, tmpDir) + + if dut.current() != 2 { + t.Fatal("current idx after load not preserved. want 2, got:", dut.current()) + } + bfr2, err := dut.cycleFilter(ctx, 1, 3) + if err != nil { + t.Fatal(err) + } + if !bfr2.Complete { + t.Fatal("Wanted complete, didn't get it") + } + if bfr2.CurrentIdx != 3 { + t.Fatal("wanted index 3, got", bfr2.CurrentIdx) + } + if bfr2.OldestIdx != 1 { + t.Fatal("wanted oldest index 3, got", bfr2.OldestIdx) + } + + // Rerun test with returned bfr2 + bf := dut.newBloomFilter() + _, err = bf.ReadFrom(bytes.NewBuffer(bfr2.Filter)) + if err != nil { + t.Fatal(err) + } + for _, tt := range tests { + t.Run(tt.in+"-reloaded", func(t *testing.T) { + if len(tt.check) == 0 { + got := bf.containsDir(tt.in) + if got != tt.exist { + // For unlimited tests this could lead to false positives, + // but it should be deterministic. + t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist) + } + return + } + for _, check := range tt.check { + got := bf.containsDir(check) + if got != tt.exist { + // For unlimited tests this could lead to false positives, + // but it should be deterministic. + t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist) + } + continue + } + }) + } +} + +func BenchmarkDataUpdateTracker(b *testing.B) { + dut := newDataUpdateTracker() + // Change some defaults. + dut.debug = false + dut.input = make(chan string) + dut.save = make(chan struct{}) + + defer addTestingLogging(b)() + + dut.Current.bf = dut.newBloomFilter() + // We do this unbuffered. This will very significantly reduce throughput, so this is a worst case. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go dut.startCollector(ctx) + input := make([]string, 1000) + rng := rand.New(rand.NewSource(0xabad1dea)) + tmp := []string{"bucket", "aprefix", "nextprefixlevel", "maybeobjname", "evendeeper", "ok-one-morelevel", "final.object"} + for i := range input { + tmp := tmp[:1+rng.Intn(cap(tmp)-1)] + input[i] = path.Join(tmp...) + } + b.SetBytes(1) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + dut.input <- input[rng.Intn(len(input))] + } +} diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index 09df4742a..39616e36a 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/binary" + "errors" "fmt" "io" "path" @@ -62,9 +63,10 @@ type dataUsageEntryInfo struct { type dataUsageCacheInfo struct { // Name of the bucket. Also root element. - Name string - LastUpdate time.Time - NextCycle uint8 + Name string + LastUpdate time.Time + NextCycle uint32 + BloomFilter []byte `msg:"BloomFilter,omitempty"` } // merge other data usage entry into this, excluding children. @@ -77,8 +79,8 @@ func (e *dataUsageEntry) merge(other dataUsageEntry) { } // mod returns true if the hash mod cycles == cycle. -func (h dataUsageHash) mod(cycle uint8, cycles uint8) bool { - return uint8(h)%cycles == cycle%cycles +func (h dataUsageHash) mod(cycle uint32, cycles uint32) bool { + return uint32(h)%cycles == cycle%cycles } // addChildString will add a child based on its name. @@ -110,6 +112,7 @@ func (d *dataUsageCache) find(path string) *dataUsageEntry { } // dui converts the flattened version of the path to DataUsageInfo. +// As a side effect d will be flattened, use a clone if this is not ok. func (d *dataUsageCache) dui(path string, buckets []BucketInfo) DataUsageInfo { e := d.find(path) if e == nil { @@ -158,6 +161,32 @@ func (d *dataUsageCache) replaceHashed(hash dataUsageHash, parent *dataUsageHash } } +// copyWithChildren will copy entry with hash from src if it exists along with any children. +// If a parent is specified it will be added to that if not already there. +// If the parent does not exist, it will be added. +func (d *dataUsageCache) copyWithChildren(src *dataUsageCache, hash dataUsageHash, parent *dataUsageHash) { + if d.Cache == nil { + d.Cache = make(map[dataUsageHash]dataUsageEntry, 100) + } + e, ok := src.Cache[hash] + if !ok { + return + } + d.Cache[hash] = e + for ch := range e.Children { + if ch == hash { + logger.LogIf(GlobalContext, errors.New("dataUsageCache.copyWithChildren: Circular reference")) + return + } + d.copyWithChildren(src, ch, &hash) + } + if parent != nil { + p := d.Cache[*parent] + p.addChild(hash) + d.Cache[*parent] = p + } +} + // StringAll returns a detailed string representation of all entries in the cache. func (d *dataUsageCache) StringAll() string { s := fmt.Sprintf("info:%+v\n", d.Info) @@ -167,6 +196,12 @@ func (d *dataUsageCache) StringAll() string { return strings.TrimSpace(s) } +// insert the hash into dst. +// dst must be at least dataUsageHashLen bytes long. +func (h dataUsageHash) bytes(dst []byte) { + binary.LittleEndian.PutUint64(dst, uint64(h)) +} + // String returns a human readable representation of the string. func (h dataUsageHash) String() string { return fmt.Sprintf("%x", uint64(h)) @@ -297,7 +332,7 @@ func (d *dataUsageCache) load(ctx context.Context, store ObjectLayer, name strin var buf bytes.Buffer err := store.GetObject(ctx, dataUsageBucket, name, 0, -1, &buf, "", ObjectOptions{}) if err != nil { - if !isErrObjectNotFound(err) { + if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) { return toObjectErr(err, dataUsageBucket, name) } *d = dataUsageCache{} diff --git a/cmd/data-usage-cache_gen.go b/cmd/data-usage-cache_gen.go index 0434254a1..2c4209cd7 100644 --- a/cmd/data-usage-cache_gen.go +++ b/cmd/data-usage-cache_gen.go @@ -37,11 +37,17 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) { return } case "NextCycle": - z.NextCycle, err = dc.ReadUint8() + z.NextCycle, err = dc.ReadUint32() if err != nil { err = msgp.WrapError(err, "NextCycle") return } + case "BloomFilter": + z.BloomFilter, err = dc.ReadBytes(z.BloomFilter) + if err != nil { + err = msgp.WrapError(err, "BloomFilter") + return + } default: err = dc.Skip() if err != nil { @@ -54,10 +60,24 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 3 +func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { + // omitempty: check for empty values + zb0001Len := uint32(4) + var zb0001Mask uint8 /* 4 bits */ + if z.BloomFilter == nil { + zb0001Len-- + zb0001Mask |= 0x8 + } + // variable map header, size zb0001Len + err = en.Append(0x80 | uint8(zb0001Len)) + if err != nil { + return + } + if zb0001Len == 0 { + return + } // write "Name" - err = en.Append(0x83, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + err = en.Append(0xa4, 0x4e, 0x61, 0x6d, 0x65) if err != nil { return } @@ -81,27 +101,55 @@ func (z dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - err = en.WriteUint8(z.NextCycle) + err = en.WriteUint32(z.NextCycle) if err != nil { err = msgp.WrapError(err, "NextCycle") return } + if (zb0001Mask & 0x8) == 0 { // if not empty + // write "BloomFilter" + err = en.Append(0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72) + if err != nil { + return + } + err = en.WriteBytes(z.BloomFilter) + if err != nil { + err = msgp.WrapError(err, "BloomFilter") + return + } + } return } // MarshalMsg implements msgp.Marshaler -func (z dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) { +func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 3 + // omitempty: check for empty values + zb0001Len := uint32(4) + var zb0001Mask uint8 /* 4 bits */ + if z.BloomFilter == nil { + zb0001Len-- + zb0001Mask |= 0x8 + } + // variable map header, size zb0001Len + o = append(o, 0x80|uint8(zb0001Len)) + if zb0001Len == 0 { + return + } // string "Name" - o = append(o, 0x83, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + o = append(o, 0xa4, 0x4e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.Name) // string "LastUpdate" o = append(o, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65) o = msgp.AppendTime(o, z.LastUpdate) // string "NextCycle" o = append(o, 0xa9, 0x4e, 0x65, 0x78, 0x74, 0x43, 0x79, 0x63, 0x6c, 0x65) - o = msgp.AppendUint8(o, z.NextCycle) + o = msgp.AppendUint32(o, z.NextCycle) + if (zb0001Mask & 0x8) == 0 { // if not empty + // string "BloomFilter" + o = append(o, 0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72) + o = msgp.AppendBytes(o, z.BloomFilter) + } return } @@ -136,11 +184,17 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { return } case "NextCycle": - z.NextCycle, bts, err = msgp.ReadUint8Bytes(bts) + z.NextCycle, bts, err = msgp.ReadUint32Bytes(bts) if err != nil { err = msgp.WrapError(err, "NextCycle") return } + case "BloomFilter": + z.BloomFilter, bts, err = msgp.ReadBytesBytes(bts, z.BloomFilter) + if err != nil { + err = msgp.WrapError(err, "BloomFilter") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -154,8 +208,8 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z dataUsageCacheInfo) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 11 + msgp.TimeSize + 10 + msgp.Uint8Size +func (z *dataUsageCacheInfo) Msgsize() (s int) { + s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 11 + msgp.TimeSize + 10 + msgp.Uint32Size + 12 + msgp.BytesPrefixSize + len(z.BloomFilter) return } diff --git a/cmd/data-usage.go b/cmd/data-usage.go index 9b9e95e83..a4c115042 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "context" + "encoding/binary" "encoding/json" "errors" "os" @@ -32,6 +33,7 @@ import ( "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/hash" + "github.com/willf/bloom" ) const ( @@ -45,6 +47,7 @@ const ( dataUsageUpdateDirCycles = 16 dataUsageRoot = SlashSeparator dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix + dataUsageBloomName = ".bloomcycle.bin" dataUsageStartDelay = 5 * time.Minute // Time to wait on startup and between cycles. ) @@ -56,6 +59,20 @@ func initDataUsageStats(ctx context.Context, objAPI ObjectLayer) { } func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) { + // Load current bloom cycle + nextBloomCycle := intDataUpdateTracker.current() + 1 + var buf bytes.Buffer + err := objAPI.GetObject(ctx, dataUsageBucket, dataUsageBloomName, 0, -1, &buf, "", ObjectOptions{}) + if err != nil { + if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) { + logger.LogIf(ctx, err) + } + } else { + if buf.Len() == 8 { + nextBloomCycle = binary.LittleEndian.Uint64(buf.Bytes()) + } + } + for { select { case <-ctx.Done(): @@ -64,9 +81,31 @@ func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) { // Wait before starting next cycle and wait on startup. results := make(chan DataUsageInfo, 1) go storeDataUsageInBackend(ctx, objAPI, results) - err := objAPI.CrawlAndGetDataUsage(ctx, results) + bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle) + logger.LogIf(ctx, err) + err = objAPI.CrawlAndGetDataUsage(ctx, bf, results) close(results) logger.LogIf(ctx, err) + if err == nil { + // Store new cycle... + nextBloomCycle++ + if nextBloomCycle%dataUpdateTrackerResetEvery == 0 { + if intDataUpdateTracker.debug { + logger.Info(color.Green("runDataUsageInfo:") + " Resetting bloom filter for next runs.") + } + nextBloomCycle++ + } + var tmp [8]byte + binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle) + r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)), false) + if err != nil { + logger.LogIf(ctx, err) + continue + } + + _, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageBloomName, NewPutObjReader(r, nil, nil), ObjectOptions{}) + logger.LogIf(ctx, err) + } } } } @@ -130,6 +169,7 @@ type folderScanner struct { getSize getSizeFn oldCache dataUsageCache newCache dataUsageCache + withFilter *bloomFilter waitForLowActiveIO func() dataUsageCrawlMult float64 @@ -164,12 +204,22 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo return nil, ctx.Err() default: } + thisHash := hashPath(folder.name) + if _, ok := f.oldCache.Cache[thisHash]; f.withFilter != nil && ok { + // If folder isn't in filter and we have data, skip it completely. + if folder.name != dataUsageRoot && !f.withFilter.containsDir(folder.name) { + f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent) + if f.dataUsageCrawlDebug { + logger.Info(color.Green("data-usage:")+" Skipping non-updated folder: %v", folder.name) + } + continue + } + } f.waitForLowActiveIO() sleepDuration(dataUsageSleepPerFolder, f.dataUsageCrawlMult) cache := dataUsageEntry{} - thisHash := hashPath(folder.name) err := readDirFn(path.Join(f.root, folder.name), func(entName string, typ os.FileMode) error { // Parse @@ -301,11 +351,14 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai t := UTCNow() dataUsageDebug := env.Get(envDataUsageCrawlDebug, config.EnableOff) == config.EnableOn - defer func() { - if dataUsageDebug { - logger.Info(color.Green("updateUsage")+" Crawl time at %s: %v", basePath, time.Since(t)) - } - }() + logPrefix := color.Green("data-usage: ") + logSuffix := color.Blue(" - %v + %v", basePath, cache.Info.Name) + if dataUsageDebug { + defer func() { + logger.Info(logPrefix+" Crawl time: %v"+logSuffix, time.Since(t)) + }() + + } if cache.Info.Name == "" { cache.Info.Name = dataUsageRoot @@ -329,8 +382,16 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai dataUsageCrawlDebug: dataUsageDebug, } + if len(cache.Info.BloomFilter) > 0 { + s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}} + _, err := s.withFilter.ReadFrom(bytes.NewBuffer(cache.Info.BloomFilter)) + if err != nil { + logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter") + s.withFilter = nil + } + } if s.dataUsageCrawlDebug { - logger.Info(color.Green("runDataUsageInfo:") + " Starting crawler master") + logger.Info(logPrefix+"Start crawling. Bloom filter: %v"+logSuffix, s.withFilter != nil) } done := ctx.Done() @@ -341,14 +402,8 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai flattenLevels-- } - var logPrefix, logSuffix string - if s.dataUsageCrawlDebug { - logPrefix = color.Green("data-usage: ") - logSuffix = color.Blue(" - %v + %v", basePath, cache.Info.Name) - } - if s.dataUsageCrawlDebug { - logger.Info(logPrefix+"Cycle: %v"+logSuffix, cache.Info.NextCycle) + logger.Info(logPrefix+"Cycle: %v, Entries: %v"+logSuffix, cache.Info.NextCycle, len(cache.Cache)) } // Always scan flattenLevels deep. Cache root is level 0. @@ -387,9 +442,10 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai continue } if du == nil { - logger.LogIf(ctx, errors.New("data-usage: no disk usage provided")) + logger.Info(logPrefix + "no disk usage provided" + logSuffix) continue } + s.newCache.replace(folder.name, "", *du) // Add to parent manually if folder.parent != nil { @@ -415,6 +471,17 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai continue } + if s.withFilter != nil { + // If folder isn't in filter, skip it completely. + if !s.withFilter.containsDir(folder.name) { + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Skipping non-updated folder: %v"+logSuffix, folder) + } + s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h]) + continue + } + } + // Update on this cycle... du, err := s.deepScanFolder(ctx, folder.name) if err != nil { @@ -427,7 +494,9 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai } s.newCache.replaceHashed(h, folder.parent, *du) } - + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Finished crawl, %v entries"+logSuffix, len(s.newCache.Cache)) + } s.newCache.Info.LastUpdate = UTCNow() s.newCache.Info.NextCycle++ return s.newCache, nil diff --git a/cmd/erasure-encode_test.go b/cmd/erasure-encode_test.go index d8ce6fb2a..8fbe2da70 100644 --- a/cmd/erasure-encode_test.go +++ b/cmd/erasure-encode_test.go @@ -40,6 +40,10 @@ func (a badDisk) ReadFileStream(volume, path string, offset, length int64) (io.R return nil, errFaultyDisk } +func (a badDisk) UpdateBloomFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) { + return nil, errFaultyDisk +} + func (a badDisk) CreateFile(volume, path string, size int64, reader io.Reader) error { return errFaultyDisk } diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 577d0d9cd..0bea09432 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -504,6 +504,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, if _, err := fs.statBucketDir(ctx, bucket); err != nil { return oi, toObjectErr(err, bucket) } + defer ObjectPathUpdated(pathutil.Join(bucket, object)) uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) // Just check if the uploadID exists to avoid copy if it doesn't. diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 9a43eb68e..fc415ffcf 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "context" + "encoding/json" "fmt" "io" "io/ioutil" @@ -41,6 +42,7 @@ import ( "github.com/minio/minio/pkg/bucket/lifecycle" "github.com/minio/minio/pkg/bucket/object/tagging" "github.com/minio/minio/pkg/bucket/policy" + "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/lock" "github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/mimedb" @@ -180,6 +182,7 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { fs.fsFormatRlk = rlk go fs.cleanupStaleMultipartUploads(ctx, GlobalMultipartCleanupInterval, GlobalMultipartExpiry) + go intDataUpdateTracker.start(GlobalContext, fsPath) // Return successfully initialized object layer. return fs, nil @@ -233,7 +236,7 @@ func (fs *FSObjects) waitForLowActiveIO() { } // CrawlAndGetDataUsage returns data usage stats of the current FS deployment -func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error { +func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { // Load bucket totals var oldCache dataUsageCache err := oldCache.load(ctx, fs, dataUsageCacheName) @@ -247,6 +250,15 @@ func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, updates chan<- Da if err != nil { return err } + oldCache.Info.BloomFilter = nil + if bf != nil { + oldCache.Info.BloomFilter = bf.bytes() + } + + if false && intDataUpdateTracker.debug { + b, _ := json.MarshalIndent(bf, "", " ") + logger.Info("Bloom filter: %v", string(b)) + } cache, err := updateUsage(ctx, fs.fsPath, oldCache, fs.waitForLowActiveIO, func(item Item) (int64, error) { // Get file size, symlinks which cannot be // followed are automatically filtered by fastwalk. @@ -256,11 +268,19 @@ func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, updates chan<- Da } return fi.Size(), nil }) + cache.Info.BloomFilter = nil // Even if there was an error, the new cache may have better info. if cache.Info.LastUpdate.After(oldCache.Info.LastUpdate) { + if intDataUpdateTracker.debug { + logger.Info(color.Green("CrawlAndGetDataUsage:")+" Saving cache with %d entries", len(cache.Cache)) + } logger.LogIf(ctx, cache.save(ctx, fs, dataUsageCacheName)) updates <- cache.dui(dataUsageRoot, buckets) + } else { + if intDataUpdateTracker.debug { + logger.Info(color.Green("CrawlAndGetDataUsage:")+" Cache not updated, %d entries", len(cache.Cache)) + } } return err @@ -452,6 +472,8 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelet // update metadata. func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) { cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) + defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) + if !cpSrcDstSame { objectDWLock := fs.NewNSLock(ctx, dstBucket, dstObject) if err := objectDWLock.GetLock(globalObjectTimeout); err != nil { @@ -871,6 +893,7 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string if err := checkPutObjectArgs(ctx, bucket, object, fs, r.Size()); err != nil { return ObjectInfo{}, err } + // Lock the object. objectLock := fs.NewNSLock(ctx, bucket, object) if err := objectLock.GetLock(globalObjectTimeout); err != nil { @@ -878,6 +901,7 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string return objInfo, err } defer objectLock.Unlock() + defer ObjectPathUpdated(path.Join(bucket, object)) atomic.AddInt64(&fs.activeIOCount, 1) defer func() { @@ -1036,6 +1060,8 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) er return err } + defer ObjectPathUpdated(path.Join(bucket, object)) + atomic.AddInt64(&fs.activeIOCount, 1) defer func() { atomic.AddInt64(&fs.activeIOCount, -1) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 2d61c36b7..0f004d70f 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -50,7 +50,7 @@ func NewGatewayLayerWithLocker(gwLayer ObjectLayer) ObjectLayer { type GatewayUnsupported struct{} // CrawlAndGetDataUsage - crawl is not implemented for gateway -func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error { +func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { logger.CriticalIf(ctx, errors.New("not implemented")) return NotImplemented{} } diff --git a/cmd/notification.go b/cmd/notification.go index d99884931..fd177bc07 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "context" + "encoding/json" "encoding/xml" "fmt" "io" @@ -30,19 +31,18 @@ import ( "time" "github.com/klauspost/compress/zip" + "github.com/minio/minio-go/v6/pkg/set" "github.com/minio/minio/cmd/crypto" "github.com/minio/minio/cmd/logger" - bucketsse "github.com/minio/minio/pkg/bucket/encryption" "github.com/minio/minio/pkg/bucket/lifecycle" objectlock "github.com/minio/minio/pkg/bucket/object/lock" "github.com/minio/minio/pkg/bucket/policy" - - "github.com/minio/minio-go/v6/pkg/set" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/madmin" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/sync/errgroup" + "github.com/willf/bloom" ) // NotificationSys - notification system. @@ -435,6 +435,75 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE return ng.Wait() } +// updateBloomFilter will cycle all servers to the current index and +// return a merged bloom filter if a complete one can be retrieved. +func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint64) (*bloomFilter, error) { + var req = bloomFilterRequest{ + Current: current, + Oldest: current - dataUsageUpdateDirCycles, + } + if current < dataUsageUpdateDirCycles { + req.Oldest = 0 + } + + // Load initial state from local... + var bf *bloomFilter + bfr, err := intDataUpdateTracker.cycleFilter(ctx, req.Oldest, req.Current) + logger.LogIf(ctx, err) + if err == nil && bfr.Complete { + nbf := intDataUpdateTracker.newBloomFilter() + bf = &nbf + _, err = bf.ReadFrom(bytes.NewBuffer(bfr.Filter)) + logger.LogIf(ctx, err) + } + + var mu sync.Mutex + g := errgroup.WithNErrs(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + g.Go(func() error { + serverBF, err := client.cycleServerBloomFilter(ctx, req) + if false && intDataUpdateTracker.debug { + b, _ := json.MarshalIndent(serverBF, "", " ") + logger.Info("Disk %v, Bloom filter: %v", client.host.Name, string(b)) + } + // Keep lock while checking result. + mu.Lock() + defer mu.Unlock() + + if err != nil || !serverBF.Complete || bf == nil { + logger.LogIf(ctx, err) + bf = nil + return nil + } + + var tmp bloom.BloomFilter + _, err = tmp.ReadFrom(bytes.NewBuffer(serverBF.Filter)) + if err != nil { + logger.LogIf(ctx, err) + bf = nil + return nil + } + if bf.BloomFilter == nil { + bf.BloomFilter = &tmp + } else { + err = bf.Merge(&tmp) + if err != nil { + logger.LogIf(ctx, err) + bf = nil + return nil + } + } + return nil + }, idx) + } + g.Wait() + return bf, nil +} + // GetLocks - makes GetLocks RPC call on all peers. func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { locksResp := make([]*PeerLocks, len(sys.peerClients)) diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 34f859eb2..a3a0111d6 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -60,7 +60,7 @@ type ObjectLayer interface { // Storage operations. Shutdown(context.Context) error - CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error + CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error StorageInfo(ctx context.Context, local bool) StorageInfo // local queries only local disks // Bucket operations. diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 18275222c..2bc9fad26 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -524,6 +524,25 @@ func (client *peerRESTClient) RemoveBucketObjectLockConfig(bucket string) error return nil } +// cycleServerBloomFilter will cycle the bloom filter to start recording to index y if not already. +// The response will contain a bloom filter starting at index x up to, but not including index y. +// If y is 0, the response will not update y, but return the currently recorded information +// from the current x to y-1. +func (client *peerRESTClient) cycleServerBloomFilter(ctx context.Context, req bloomFilterRequest) (*bloomFilterResponse, error) { + var reader bytes.Buffer + err := gob.NewEncoder(&reader).Encode(req) + if err != nil { + return nil, err + } + respBody, err := client.call(peerRESTMethodCycleBloom, nil, &reader, -1) + if err != nil { + return nil, err + } + var resp bloomFilterResponse + defer http.DrainBody(respBody) + return &resp, gob.NewDecoder(respBody).Decode(&resp) +} + // SetBucketPolicy - Set bucket policy on the peer node. func (client *peerRESTClient) SetBucketPolicy(bucket string, bucketPolicy *policy.Policy) error { values := make(url.Values) diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index e1a66c538..ad5c6a659 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -53,6 +53,7 @@ const ( peerRESTMethodBucketPolicySet = "/setbucketpolicy" peerRESTMethodBucketNotificationPut = "/putbucketnotification" peerRESTMethodReloadFormat = "/reloadformat" + peerRESTMethodCycleBloom = "/cyclebloom" peerRESTMethodTrace = "/trace" peerRESTMethodListen = "/listen" peerRESTMethodLog = "/log" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 3d06afa61..0cadb4905 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -773,6 +773,30 @@ func (s *peerRESTServer) SetBucketSSEConfigHandler(w http.ResponseWriter, r *htt w.(http.Flusher).Flush() } +// CycleServerBloomFilterHandler cycles bllom filter on server. +func (s *peerRESTServer) CycleServerBloomFilterHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + ctx := newContext(r, w, "CycleServerBloomFilter") + + var req bloomFilterRequest + err := gob.NewDecoder(r.Body).Decode(&req) + if err != nil { + s.writeErrorResponse(w, err) + return + } + bf, err := intDataUpdateTracker.cycleFilter(ctx, req.Oldest, req.Current) + if err != nil { + s.writeErrorResponse(w, err) + return + } + logger.LogIf(ctx, gob.NewEncoder(w).Encode(bf)) + w.(http.Flusher).Flush() +} + // PutBucketNotificationHandler - Set bucket policy. func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1124,6 +1148,7 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDriveOBDInfo).HandlerFunc(httpTraceHdrs(server.DriveOBDInfoHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetOBDInfo).HandlerFunc(httpTraceHdrs(server.NetOBDInfoHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDispatchNetOBDInfo).HandlerFunc(httpTraceHdrs(server.DispatchNetOBDInfoHandler)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucket).HandlerFunc(httpTraceHdrs(server.DeleteBucketHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSignalService).HandlerFunc(httpTraceHdrs(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerUpdate).HandlerFunc(httpTraceHdrs(server.ServerUpdateHandler)).Queries(restQueries(peerRESTUpdateURL, peerRESTSha256Hex, peerRESTLatestRelease)...) diff --git a/cmd/server-main.go b/cmd/server-main.go index c19adb0e4..2278acf92 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -491,7 +491,6 @@ func serverMain(ctx *cli.Context) { // Initialize object layer with the supplied disks, objectLayer is nil upon any error. func newObjectLayer(ctx context.Context, endpointZones EndpointZones) (newObject ObjectLayer, err error) { // For FS only, directly use the disk. - if endpointZones.NEndpoints() == 1 { // Initialize new FS object layer. return NewFSObjectLayer(endpointZones[0].Endpoints[0].Path) diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 3281a16ec..d66e83342 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -473,7 +473,8 @@ func (s *xlSets) StorageInfo(ctx context.Context, local bool) StorageInfo { return storageInfo } -func (s *xlSets) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error { +func (s *xlSets) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { + // Use the zone-level implementation instead. return NotImplemented{} } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index dc3a04ca3..a5bcd50cf 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -42,6 +42,9 @@ func (xl xlObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealRes // `policy.json, notification.xml, listeners.json`. func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ( result madmin.HealResultItem, err error) { + if !dryRun { + defer ObjectPathUpdated(bucket) + } storageDisks := xl.getDisks() diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 7fb5ce4b4..cb7e3761a 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -545,6 +545,8 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, return oi, toObjectErr(errFileParentIsFile, bucket, object) } + defer ObjectPathUpdated(path.Join(bucket, object)) + // Calculate s3 compatible md5sum for complete multipart. s3MD5 := getCompleteMultipartMD5(parts) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 526aa730a..98627e936 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -69,6 +69,8 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc // Check if this request is only metadata update. if cpSrcDstSame { + defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) + // Read metadata associated with the object from all disks. storageDisks := xl.getDisks() @@ -481,6 +483,7 @@ func (xl xlObjects) PutObject(ctx context.Context, bucket string, object string, // putObject wrapper for xl PutObject func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { + defer ObjectPathUpdated(path.Join(bucket, object)) data := r.Reader uniqueID := mustGetUUID() @@ -695,6 +698,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, writeQuorum int, isDir bool) error { var disks []StorageAPI var err error + defer ObjectPathUpdated(path.Join(bucket, object)) tmpObj := mustGetUUID() if bucket == minioMetaTmpBucket { @@ -755,6 +759,7 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects if errs[idx] != nil { continue } + tmpObjs[idx] = mustGetUUID() var err error // Rename the current object while requiring @@ -774,6 +779,7 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects if err != nil { errs[idx] = err } + ObjectPathUpdated(path.Join(bucket, objects[idx])) } } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 7cbe0014b..c272d10e1 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "errors" "fmt" "sort" "sync" @@ -25,6 +26,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" + "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/sync/errgroup" @@ -200,24 +202,14 @@ func (xl xlObjects) GetMetrics(ctx context.Context) (*Metrics, error) { // CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed. // Updates are sent on a regular basis and the caller *must* consume them. -func (xl xlObjects) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error { - cache := make(chan dataUsageCache, 1) - defer close(cache) - buckets, err := xl.ListBuckets(ctx) - if err != nil { - return err - } - go func() { - for update := range cache { - updates <- update.dui(update.Info.Name, buckets) - } - }() - return xl.crawlAndGetDataUsage(ctx, buckets, cache) +func (xl xlObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { + // This should only be called from runDataUsageInfo and this setup should not happen (zones). + return errors.New("xlObjects CrawlAndGetDataUsage not implemented") } // CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed. // Updates are sent on a regular basis and the caller *must* consume them. -func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, updates chan<- dataUsageCache) error { +func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, updates chan<- dataUsageCache) error { var disks []StorageAPI for _, d := range xl.getLoadBalancedDisks() { @@ -258,8 +250,14 @@ func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketIn for _, b := range buckets { e := oldCache.find(b.Name) if e != nil { - bucketCh <- b - cache.replace(b.Name, dataUsageRoot, *e) + if bf == nil || bf.containsDir(b.Name) { + bucketCh <- b + cache.replace(b.Name, dataUsageRoot, *e) + } else { + if intDataUpdateTracker.debug { + logger.Info(color.Green("crawlAndGetDataUsage:")+" Skipping bucket %v, not updated", b.Name) + } + } } } @@ -303,6 +301,9 @@ func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketIn cache.Info.NextCycle++ cache.Info.LastUpdate = time.Now() logger.LogIf(ctx, cache.save(ctx, xl, dataUsageCacheName)) + if intDataUpdateTracker.debug { + logger.Info(color.Green("crawlAndGetDataUsage:")+" Cache saved, Next Cycle: %d", cache.Info.NextCycle) + } updates <- cache }() @@ -339,7 +340,11 @@ func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketIn // Calc usage before := cache.Info.LastUpdate + if bf != nil { + cache.Info.BloomFilter = bf.bytes() + } cache, err = disk.CrawlAndGetDataUsage(ctx, cache) + cache.Info.BloomFilter = nil if err != nil { logger.LogIf(ctx, err) if cache.Info.LastUpdate.After(before) { diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index 18583e7a2..5c54df54d 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -64,8 +64,16 @@ func newXLZones(ctx context.Context, endpointZones EndpointZones) (ObjectLayer, storageDisks = make([][]StorageAPI, len(endpointZones)) z = &xlZones{zones: make([]*xlSets, len(endpointZones))} ) + + var localDrives []string + local := endpointZones.FirstLocal() for i, ep := range endpointZones { + for _, endpoint := range ep.Endpoints { + if endpoint.IsLocal { + localDrives = append(localDrives, endpoint.Path) + } + } storageDisks[i], formats[i], err = waitForFormatXL(local, ep.Endpoints, i+1, ep.SetCount, ep.DrivesPerSet, deploymentID) if err != nil { @@ -82,6 +90,8 @@ func newXLZones(ctx context.Context, endpointZones EndpointZones) (ObjectLayer, if !z.SingleZone() { z.quickHealBuckets(ctx) } + go intDataUpdateTracker.start(GlobalContext, localDrives...) + return z, nil } @@ -217,7 +227,7 @@ func (z *xlZones) StorageInfo(ctx context.Context, local bool) StorageInfo { return storageInfo } -func (z *xlZones) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error { +func (z *xlZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { ctx, cancel := context.WithCancel(ctx) defer cancel() var wg sync.WaitGroup @@ -257,7 +267,7 @@ func (z *xlZones) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataU } }() // Start crawler. Blocks until done. - err := xl.crawlAndGetDataUsage(ctx, buckets, updates) + err := xl.crawlAndGetDataUsage(ctx, buckets, bf, updates) if err != nil { mu.Lock() if firstErr == nil { diff --git a/go.mod b/go.mod index 270dda83f..ce7771def 100644 --- a/go.mod +++ b/go.mod @@ -99,11 +99,14 @@ require ( github.com/sirupsen/logrus v1.5.0 github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3 // indirect github.com/soheilhy/cmux v0.1.4 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 github.com/tinylib/msgp v1.1.1 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/ugorji/go v1.1.5-pre // indirect github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a + github.com/willf/bitset v1.1.10 // indirect + github.com/willf/bloom v2.0.3+incompatible github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.3.3 // indirect diff --git a/go.sum b/go.sum index c15d82c95..6a4366554 100644 --- a/go.sum +++ b/go.sum @@ -398,6 +398,8 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2 github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -424,6 +426,10 @@ github.com/ugorji/go/codec v1.1.5-pre h1:5YV9PsFAN+ndcCtTM7s60no7nY7eTG3LPtxhSwu github.com/ugorji/go/codec v1.1.5-pre/go.mod h1:tULtS6Gy1AE1yCENaw4Vb//HLH5njI2tfCQDUqRd8fI= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc= +github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= +github.com/willf/bloom v2.0.3+incompatible h1:QDacWdqcAUI1MPOwIQZRy9kOR7yxfyEmxX8Wdm2/JPA= +github.com/willf/bloom v2.0.3+incompatible/go.mod h1:MmAltL9pDMNTrvUkxdg0k0q5I0suxmuwp3KbyrZLOZ8= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=