diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index a3650720b..1659d5939 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "path" "time" "github.com/minio/minio/cmd/logger" @@ -88,6 +89,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { case bucket != "" && object != "": res, err = objAPI.HealObject(ctx, bucket, object, task.opts) } + ObjectPathUpdated(path.Join(bucket, object)) if task.responseCh != nil { task.responseCh <- healResult{result: res, err: err} } diff --git a/cmd/data-update-tracker.go b/cmd/data-update-tracker.go new file mode 100644 index 000000000..4aec4381a --- /dev/null +++ b/cmd/data-update-tracker.go @@ -0,0 +1,607 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bufio" + "bytes" + "context" + "encoding/binary" + "errors" + "io" + "io/ioutil" + "os" + "path" + "sort" + "strings" + "sync" + "time" + + "github.com/minio/minio/cmd/config" + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/color" + "github.com/minio/minio/pkg/env" + "github.com/willf/bloom" +) + +const ( + // Estimate bloom filter size. With this many items + dataUpdateTrackerEstItems = 1000000 + // ... we want this false positive rate: + dataUpdateTrackerFP = 0.99 + dataUpdateTrackerQueueSize = 10000 + + dataUpdateTrackerVersion = 1 + dataUpdateTrackerFilename = minioMetaBucket + SlashSeparator + bucketMetaPrefix + SlashSeparator + ".tracker.bin" + dataUpdateTrackerSaveInterval = 5 * time.Minute + + // Reset bloom filters every n cycle + dataUpdateTrackerResetEvery = 1000 +) + +var ( + objectUpdatedCh chan<- string + intDataUpdateTracker *dataUpdateTracker +) + +func init() { + intDataUpdateTracker = newDataUpdateTracker() + objectUpdatedCh = intDataUpdateTracker.input +} + +type dataUpdateTracker struct { + mu sync.Mutex + input chan string + save chan struct{} + debug bool + saveExited chan struct{} + + Current dataUpdateFilter + History dataUpdateTrackerHistory + Saved time.Time +} + +// newDataUpdateTracker returns a dataUpdateTracker with default settings. +func newDataUpdateTracker() *dataUpdateTracker { + d := &dataUpdateTracker{ + Current: dataUpdateFilter{ + idx: 1, + }, + debug: env.Get(envDataUsageCrawlDebug, config.EnableOff) == config.EnableOn, + input: make(chan string, dataUpdateTrackerQueueSize), + save: make(chan struct{}, 1), + saveExited: make(chan struct{}), + } + d.Current.bf = d.newBloomFilter() + return d +} + +type dataUpdateTrackerHistory []dataUpdateFilter + +type dataUpdateFilter struct { + idx uint64 + bf bloomFilter +} + +type bloomFilter struct { + *bloom.BloomFilter +} + +// emptyBloomFilter returns an empty bloom filter. +func emptyBloomFilter() bloomFilter { + return bloomFilter{BloomFilter: &bloom.BloomFilter{}} +} + +// containsDir returns whether the bloom filter contains a directory. +// Note that objects in XL mode are also considered directories. +func (b bloomFilter) containsDir(in string) bool { + split := splitPathDeterministic(path.Clean(in)) + + if len(split) == 0 { + return false + } + var tmp [dataUsageHashLen]byte + hashPath(path.Join(split...)).bytes(tmp[:]) + return b.Test(tmp[:]) +} + +// bytes returns the bloom filter serialized as a byte slice. +func (b bloomFilter) bytes() []byte { + if b.BloomFilter == nil { + return nil + } + var buf bytes.Buffer + _, err := b.WriteTo(&buf) + if err != nil { + logger.LogIf(GlobalContext, err) + return nil + } + return buf.Bytes() +} + +// sort the dataUpdateTrackerHistory, newest first. +// Returns whether the history is complete. +func (d dataUpdateTrackerHistory) sort() bool { + if len(d) == 0 { + return true + } + sort.Slice(d, func(i, j int) bool { + return d[i].idx > d[j].idx + }) + return d[0].idx-d[len(d)-1].idx == uint64(len(d)) +} + +// removeOlderThan will remove entries older than index 'n'. +func (d *dataUpdateTrackerHistory) removeOlderThan(n uint64) { + d.sort() + dd := *d + end := len(dd) + for i := end - 1; i >= 0; i-- { + if dd[i].idx < n { + end = i + } + } + dd = dd[:end] + *d = dd +} + +// newBloomFilter returns a new bloom filter with default settings. +func (d *dataUpdateTracker) newBloomFilter() bloomFilter { + return bloomFilter{bloom.NewWithEstimates(dataUpdateTrackerEstItems, dataUpdateTrackerFP)} +} + +// current returns the current index. +func (d *dataUpdateTracker) current() uint64 { + d.mu.Lock() + defer d.mu.Unlock() + return d.Current.idx +} + +// start will load the current data from the drives start collecting information and +// start a saver goroutine. +// All of these will exit when the context is canceled. +func (d *dataUpdateTracker) start(ctx context.Context, drives ...string) { + if len(drives) <= 0 { + logger.LogIf(ctx, errors.New("dataUpdateTracker.start: No drives specified")) + return + } + d.load(ctx, drives...) + go d.startCollector(ctx) + go d.startSaver(ctx, dataUpdateTrackerSaveInterval, drives) +} + +// load will attempt to load data tracking information from the supplied drives. +// The data will only be loaded if d.Saved is older than the one found on disk. +// The newest working cache will be kept in d. +// If no valid data usage tracker can be found d will remain unchanged. +// If object is shared the caller should lock it. +func (d *dataUpdateTracker) load(ctx context.Context, drives ...string) { + if len(drives) <= 0 { + logger.LogIf(ctx, errors.New("dataUpdateTracker.load: No drives specified")) + return + } + for _, drive := range drives { + cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename) + f, err := os.Open(cacheFormatPath) + if err != nil { + if os.IsNotExist(err) { + continue + } + logger.LogIf(ctx, err) + continue + } + err = d.deserialize(f, d.Saved) + if err != nil { + logger.LogIf(ctx, err) + } + f.Close() + } +} + +// startSaver will start a saver that will write d to all supplied drives at specific intervals. +// The saver will save and exit when supplied context is closed. +func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Duration, drives []string) { + t := time.NewTicker(interval) + defer t.Stop() + var buf bytes.Buffer + d.mu.Lock() + saveNow := d.save + exited := make(chan struct{}) + d.saveExited = exited + d.mu.Unlock() + defer close(exited) + for { + var exit bool + select { + case <-ctx.Done(): + exit = true + case <-t.C: + case <-saveNow: + } + buf.Reset() + d.mu.Lock() + d.Saved = UTCNow() + err := d.serialize(&buf) + if d.debug { + logger.Info(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v", buf.Len(), d.Current.idx) + } + d.mu.Unlock() + if err != nil { + logger.LogIf(ctx, err, "Error serializing usage tracker data") + if exit { + return + } + continue + } + if buf.Len() == 0 { + logger.LogIf(ctx, errors.New("zero sized output, skipping save")) + continue + } + for _, drive := range drives { + cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename) + err := ioutil.WriteFile(cacheFormatPath, buf.Bytes(), os.ModePerm) + if err != nil { + logger.LogIf(ctx, err) + continue + } + } + if exit { + return + } + } +} + +// serialize all data in d to dst. +// Caller should hold lock if d is expected to be shared. +// If an error is returned, there will likely be partial data written to dst. +func (d *dataUpdateTracker) serialize(dst io.Writer) (err error) { + ctx := GlobalContext + var tmp [8]byte + o := bufio.NewWriter(dst) + defer func() { + if err == nil { + err = o.Flush() + } + }() + + // Version + if err := o.WriteByte(dataUpdateTrackerVersion); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + // Timestamp. + binary.LittleEndian.PutUint64(tmp[:], uint64(d.Saved.Unix())) + if _, err := o.Write(tmp[:]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + // Current + binary.LittleEndian.PutUint64(tmp[:], d.Current.idx) + if _, err := o.Write(tmp[:]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + if _, err := d.Current.bf.WriteTo(o); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + // History + binary.LittleEndian.PutUint64(tmp[:], uint64(len(d.History))) + if _, err := o.Write(tmp[:]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + for _, bf := range d.History { + // Current + binary.LittleEndian.PutUint64(tmp[:], bf.idx) + if _, err := o.Write(tmp[:]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + if _, err := bf.bf.WriteTo(o); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + } + return nil +} + +// deserialize will deserialize the supplied input if the input is newer than the supplied time. +func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) error { + ctx := GlobalContext + var dst dataUpdateTracker + var tmp [8]byte + + // Version + if _, err := io.ReadFull(src, tmp[:1]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + switch tmp[0] { + case dataUpdateTrackerVersion: + default: + return errors.New("dataUpdateTracker: Unknown data version") + } + // Timestamp. + if _, err := io.ReadFull(src, tmp[:8]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + t := time.Unix(int64(binary.LittleEndian.Uint64(tmp[:])), 0) + if !t.After(newerThan) { + return nil + } + + // Current + if _, err := io.ReadFull(src, tmp[:8]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + dst.Current.idx = binary.LittleEndian.Uint64(tmp[:]) + dst.Current.bf = emptyBloomFilter() + if _, err := dst.Current.bf.ReadFrom(src); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + + // History + if _, err := io.ReadFull(src, tmp[:8]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + n := binary.LittleEndian.Uint64(tmp[:]) + dst.History = make(dataUpdateTrackerHistory, int(n)) + for i, e := range dst.History { + if _, err := io.ReadFull(src, tmp[:8]); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + e.idx = binary.LittleEndian.Uint64(tmp[:]) + e.bf = emptyBloomFilter() + if _, err := e.bf.ReadFrom(src); err != nil { + if d.debug { + logger.LogIf(ctx, err) + } + return err + } + dst.History[i] = e + } + // Ignore what remains on the stream. + // Update d: + d.Current = dst.Current + d.History = dst.History + d.Saved = dst.Saved + return nil +} + +// start a collector that picks up entries from objectUpdatedCh +// and adds them to the current bloom filter. +func (d *dataUpdateTracker) startCollector(ctx context.Context) { + var tmp [dataUsageHashLen]byte + for { + select { + case <-ctx.Done(): + return + case in := <-d.input: + bucket, _ := path2BucketObjectWithBasePath("", in) + if bucket == "" { + if d.debug && len(in) > 0 { + logger.Info(color.Green("data-usage:")+" no bucket (%s)", in) + } + continue + } + + if isReservedOrInvalidBucket(bucket, false) { + if false && d.debug { + logger.Info(color.Green("data-usage:")+" isReservedOrInvalidBucket: %v, entry: %v", bucket, in) + } + continue + } + split := splitPathDeterministic(in) + + // Add all paths until level 3. + d.mu.Lock() + for i := range split { + if d.debug && false { + logger.Info(color.Green("dataUpdateTracker:") + " Marking path dirty: " + color.Blue(path.Join(split[:i+1]...))) + } + hashPath(path.Join(split[:i+1]...)).bytes(tmp[:]) + d.Current.bf.Add(tmp[:]) + } + d.mu.Unlock() + } + } +} + +// find entry with specified index. +// Returns nil if not found. +func (d dataUpdateTrackerHistory) find(idx uint64) *dataUpdateFilter { + for _, f := range d { + if f.idx == idx { + return &f + } + } + return nil +} + +// filterFrom will return a combined bloom filter. +func (d *dataUpdateTracker) filterFrom(ctx context.Context, oldest, newest uint64) *bloomFilterResponse { + bf := d.newBloomFilter() + bfr := bloomFilterResponse{ + OldestIdx: oldest, + CurrentIdx: d.Current.idx, + Complete: true, + } + // Loop through each index requested. + for idx := oldest; idx <= newest; idx++ { + v := d.History.find(idx) + if v == nil { + if d.Current.idx == idx { + // Merge current. + err := bf.Merge(d.Current.bf.BloomFilter) + logger.LogIf(ctx, err) + if err != nil { + bfr.Complete = false + } + continue + } + bfr.Complete = false + bfr.OldestIdx = idx + 1 + continue + } + + err := bf.Merge(v.bf.BloomFilter) + if err != nil { + bfr.Complete = false + logger.LogIf(ctx, err) + continue + } + bfr.NewestIdx = idx + } + var dst bytes.Buffer + _, err := bf.WriteTo(&dst) + if err != nil { + logger.LogIf(ctx, err) + return nil + } + bfr.Filter = dst.Bytes() + + return &bfr +} + +// cycleFilter will cycle the bloom filter to start recording to index y if not already. +// The response will contain a bloom filter starting at index x up to, but not including index y. +// If y is 0, the response will not update y, but return the currently recorded information +// from the up until and including current y. +func (d *dataUpdateTracker) cycleFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) { + d.mu.Lock() + defer d.mu.Unlock() + + if current == 0 { + if len(d.History) == 0 { + return d.filterFrom(ctx, d.Current.idx, d.Current.idx), nil + } + d.History.sort() + return d.filterFrom(ctx, d.History[len(d.History)-1].idx, d.Current.idx), nil + } + + // Move current to history if new one requested + if d.Current.idx != current { + if d.debug { + logger.Info(color.Green("dataUpdateTracker:")+" cycle bloom filter: %v -> %v", d.Current.idx, current) + } + + d.History = append(d.History, d.Current) + d.Current.idx = current + d.Current.bf = d.newBloomFilter() + select { + case d.save <- struct{}{}: + default: + } + } + d.History.removeOlderThan(oldest) + return d.filterFrom(ctx, oldest, current), nil +} + +// splitPathDeterministic will split the provided relative path +// deterministically and return up to the first 3 elements of the path. +// Slash and dot prefixes are removed. +// Trailing slashes are removed. +// Returns 0 length if no parts are found after trimming. +func splitPathDeterministic(in string) []string { + split := strings.Split(in, SlashSeparator) + + // Trim empty start/end + for len(split) > 0 { + if len(split[0]) > 0 && split[0] != "." { + break + } + split = split[1:] + } + for len(split) > 0 { + if len(split[len(split)-1]) > 0 { + break + } + split = split[:len(split)-1] + } + + // Return up to 3 parts. + if len(split) > 3 { + split = split[:3] + } + return split +} + +// bloomFilterRequest request bloom filters. +// Current index will be updated to current and entries back to Oldest is returned. +type bloomFilterRequest struct { + Oldest uint64 + Current uint64 +} + +type bloomFilterResponse struct { + // Current index being written to. + CurrentIdx uint64 + // Oldest index in the returned bloom filter. + OldestIdx uint64 + // Newest Index in the returned bloom filter. + NewestIdx uint64 + // Are all indexes between oldest and newest filled? + Complete bool + // Binary data of the bloom filter. + Filter []byte +} + +// ObjectPathUpdated indicates a path has been updated. +// The function will never block. +func ObjectPathUpdated(s string) { + select { + case objectUpdatedCh <- s: + default: + } +} diff --git a/cmd/data-update-tracker_test.go b/cmd/data-update-tracker_test.go new file mode 100644 index 000000000..9af0c9d11 --- /dev/null +++ b/cmd/data-update-tracker_test.go @@ -0,0 +1,262 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "math/rand" + "os" + "path" + "path/filepath" + "sync" + "testing" + + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/cmd/logger/message/log" +) + +type testLoggerI interface { + Helper() + Log(args ...interface{}) +} + +type testingLogger struct { + mu sync.Mutex + t testLoggerI +} + +func (t *testingLogger) Send(entry interface{}, errKind string) error { + t.mu.Lock() + defer t.mu.Unlock() + if t.t == nil { + return nil + } + e, ok := entry.(log.Entry) + if !ok { + return fmt.Errorf("unexpected log entry structure %#v", entry) + } + + t.t.Helper() + t.t.Log(e.Level, ":", errKind, e.Message) + return nil +} + +func addTestingLogging(t testLoggerI) func() { + tl := &testingLogger{t: t} + logger.AddTarget(tl) + return func() { + tl.mu.Lock() + defer tl.mu.Unlock() + tl.t = nil + } +} + +func TestDataUpdateTracker(t *testing.T) { + dut := newDataUpdateTracker() + // Change some defaults. + dut.debug = testing.Verbose() + dut.input = make(chan string) + dut.save = make(chan struct{}) + + defer addTestingLogging(t)() + + dut.Current.bf = dut.newBloomFilter() + + tmpDir, err := ioutil.TempDir("", "TestDataUpdateTracker") + if err != nil { + t.Fatal(err) + } + err = os.MkdirAll(filepath.Dir(filepath.Join(tmpDir, dataUpdateTrackerFilename)), os.ModePerm) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dut.start(ctx, tmpDir) + + var tests = []struct { + in string + check []string // if not empty, check against these instead. + exist bool + }{ + { + in: "bucket/directory/file.txt", + check: []string{"bucket", "bucket/", "/bucket", "bucket/directory", "bucket/directory/", "bucket/directory/file.txt", "/bucket/directory/file.txt"}, + exist: true, + }, + { + // System bucket + in: ".minio.sys/ignoreme/pls", + exist: false, + }, + { + // Not a valid bucket + in: "./bucket/okfile.txt", + check: []string{"./bucket/okfile.txt", "/bucket/okfile.txt", "bucket/okfile.txt"}, + exist: false, + }, + { + // Not a valid bucket + in: "æ/okfile.txt", + check: []string{"æ/okfile.txt", "æ/okfile.txt", "æ"}, + exist: false, + }, + { + in: "/bucket2/okfile2.txt", + check: []string{"./bucket2/okfile2.txt", "/bucket2/okfile2.txt", "bucket2/okfile2.txt", "bucket2"}, + exist: true, + }, + { + in: "/bucket3/prefix/okfile2.txt", + check: []string{"./bucket3/prefix/okfile2.txt", "/bucket3/prefix/okfile2.txt", "bucket3/prefix/okfile2.txt", "bucket3/prefix", "bucket3"}, + exist: true, + }, + } + for _, tt := range tests { + t.Run(tt.in, func(t *testing.T) { + dut.input <- tt.in + dut.input <- "" // Sending empty string ensures the previous is added to filter. + dut.mu.Lock() + defer dut.mu.Unlock() + if len(tt.check) == 0 { + got := dut.Current.bf.containsDir(tt.in) + if got != tt.exist { + // For unlimited tests this could lead to false positives, + // but it should be deterministic. + t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist) + } + return + } + for _, check := range tt.check { + got := dut.Current.bf.containsDir(check) + if got != tt.exist { + // For unlimited tests this could lead to false positives, + // but it should be deterministic. + t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist) + } + continue + } + }) + } + // Cycle to history + _, err = dut.cycleFilter(ctx, 1, 2) + if err != nil { + t.Fatal(err) + } + dut.input <- "cycle2/file.txt" + dut.input <- "" // Sending empty string ensures the previous is added to filter. + + tests = append(tests, struct { + in string + check []string + exist bool + }{in: "cycle2/file.txt", exist: true}) + + // Shut down + cancel() + <-dut.saveExited + + if dut.current() != 2 { + t.Fatal("wrong current idx after save. want 2, got:", dut.current()) + } + + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + // Reload... + dut = newDataUpdateTracker() + dut.start(ctx, tmpDir) + + if dut.current() != 2 { + t.Fatal("current idx after load not preserved. want 2, got:", dut.current()) + } + bfr2, err := dut.cycleFilter(ctx, 1, 3) + if err != nil { + t.Fatal(err) + } + if !bfr2.Complete { + t.Fatal("Wanted complete, didn't get it") + } + if bfr2.CurrentIdx != 3 { + t.Fatal("wanted index 3, got", bfr2.CurrentIdx) + } + if bfr2.OldestIdx != 1 { + t.Fatal("wanted oldest index 3, got", bfr2.OldestIdx) + } + + // Rerun test with returned bfr2 + bf := dut.newBloomFilter() + _, err = bf.ReadFrom(bytes.NewBuffer(bfr2.Filter)) + if err != nil { + t.Fatal(err) + } + for _, tt := range tests { + t.Run(tt.in+"-reloaded", func(t *testing.T) { + if len(tt.check) == 0 { + got := bf.containsDir(tt.in) + if got != tt.exist { + // For unlimited tests this could lead to false positives, + // but it should be deterministic. + t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist) + } + return + } + for _, check := range tt.check { + got := bf.containsDir(check) + if got != tt.exist { + // For unlimited tests this could lead to false positives, + // but it should be deterministic. + t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist) + } + continue + } + }) + } +} + +func BenchmarkDataUpdateTracker(b *testing.B) { + dut := newDataUpdateTracker() + // Change some defaults. + dut.debug = false + dut.input = make(chan string) + dut.save = make(chan struct{}) + + defer addTestingLogging(b)() + + dut.Current.bf = dut.newBloomFilter() + // We do this unbuffered. This will very significantly reduce throughput, so this is a worst case. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go dut.startCollector(ctx) + input := make([]string, 1000) + rng := rand.New(rand.NewSource(0xabad1dea)) + tmp := []string{"bucket", "aprefix", "nextprefixlevel", "maybeobjname", "evendeeper", "ok-one-morelevel", "final.object"} + for i := range input { + tmp := tmp[:1+rng.Intn(cap(tmp)-1)] + input[i] = path.Join(tmp...) + } + b.SetBytes(1) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + dut.input <- input[rng.Intn(len(input))] + } +} diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index 09df4742a..39616e36a 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/binary" + "errors" "fmt" "io" "path" @@ -62,9 +63,10 @@ type dataUsageEntryInfo struct { type dataUsageCacheInfo struct { // Name of the bucket. Also root element. - Name string - LastUpdate time.Time - NextCycle uint8 + Name string + LastUpdate time.Time + NextCycle uint32 + BloomFilter []byte `msg:"BloomFilter,omitempty"` } // merge other data usage entry into this, excluding children. @@ -77,8 +79,8 @@ func (e *dataUsageEntry) merge(other dataUsageEntry) { } // mod returns true if the hash mod cycles == cycle. -func (h dataUsageHash) mod(cycle uint8, cycles uint8) bool { - return uint8(h)%cycles == cycle%cycles +func (h dataUsageHash) mod(cycle uint32, cycles uint32) bool { + return uint32(h)%cycles == cycle%cycles } // addChildString will add a child based on its name. @@ -110,6 +112,7 @@ func (d *dataUsageCache) find(path string) *dataUsageEntry { } // dui converts the flattened version of the path to DataUsageInfo. +// As a side effect d will be flattened, use a clone if this is not ok. func (d *dataUsageCache) dui(path string, buckets []BucketInfo) DataUsageInfo { e := d.find(path) if e == nil { @@ -158,6 +161,32 @@ func (d *dataUsageCache) replaceHashed(hash dataUsageHash, parent *dataUsageHash } } +// copyWithChildren will copy entry with hash from src if it exists along with any children. +// If a parent is specified it will be added to that if not already there. +// If the parent does not exist, it will be added. +func (d *dataUsageCache) copyWithChildren(src *dataUsageCache, hash dataUsageHash, parent *dataUsageHash) { + if d.Cache == nil { + d.Cache = make(map[dataUsageHash]dataUsageEntry, 100) + } + e, ok := src.Cache[hash] + if !ok { + return + } + d.Cache[hash] = e + for ch := range e.Children { + if ch == hash { + logger.LogIf(GlobalContext, errors.New("dataUsageCache.copyWithChildren: Circular reference")) + return + } + d.copyWithChildren(src, ch, &hash) + } + if parent != nil { + p := d.Cache[*parent] + p.addChild(hash) + d.Cache[*parent] = p + } +} + // StringAll returns a detailed string representation of all entries in the cache. func (d *dataUsageCache) StringAll() string { s := fmt.Sprintf("info:%+v\n", d.Info) @@ -167,6 +196,12 @@ func (d *dataUsageCache) StringAll() string { return strings.TrimSpace(s) } +// insert the hash into dst. +// dst must be at least dataUsageHashLen bytes long. +func (h dataUsageHash) bytes(dst []byte) { + binary.LittleEndian.PutUint64(dst, uint64(h)) +} + // String returns a human readable representation of the string. func (h dataUsageHash) String() string { return fmt.Sprintf("%x", uint64(h)) @@ -297,7 +332,7 @@ func (d *dataUsageCache) load(ctx context.Context, store ObjectLayer, name strin var buf bytes.Buffer err := store.GetObject(ctx, dataUsageBucket, name, 0, -1, &buf, "", ObjectOptions{}) if err != nil { - if !isErrObjectNotFound(err) { + if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) { return toObjectErr(err, dataUsageBucket, name) } *d = dataUsageCache{} diff --git a/cmd/data-usage-cache_gen.go b/cmd/data-usage-cache_gen.go index 0434254a1..2c4209cd7 100644 --- a/cmd/data-usage-cache_gen.go +++ b/cmd/data-usage-cache_gen.go @@ -37,11 +37,17 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) { return } case "NextCycle": - z.NextCycle, err = dc.ReadUint8() + z.NextCycle, err = dc.ReadUint32() if err != nil { err = msgp.WrapError(err, "NextCycle") return } + case "BloomFilter": + z.BloomFilter, err = dc.ReadBytes(z.BloomFilter) + if err != nil { + err = msgp.WrapError(err, "BloomFilter") + return + } default: err = dc.Skip() if err != nil { @@ -54,10 +60,24 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 3 +func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { + // omitempty: check for empty values + zb0001Len := uint32(4) + var zb0001Mask uint8 /* 4 bits */ + if z.BloomFilter == nil { + zb0001Len-- + zb0001Mask |= 0x8 + } + // variable map header, size zb0001Len + err = en.Append(0x80 | uint8(zb0001Len)) + if err != nil { + return + } + if zb0001Len == 0 { + return + } // write "Name" - err = en.Append(0x83, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + err = en.Append(0xa4, 0x4e, 0x61, 0x6d, 0x65) if err != nil { return } @@ -81,27 +101,55 @@ func (z dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - err = en.WriteUint8(z.NextCycle) + err = en.WriteUint32(z.NextCycle) if err != nil { err = msgp.WrapError(err, "NextCycle") return } + if (zb0001Mask & 0x8) == 0 { // if not empty + // write "BloomFilter" + err = en.Append(0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72) + if err != nil { + return + } + err = en.WriteBytes(z.BloomFilter) + if err != nil { + err = msgp.WrapError(err, "BloomFilter") + return + } + } return } // MarshalMsg implements msgp.Marshaler -func (z dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) { +func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 3 + // omitempty: check for empty values + zb0001Len := uint32(4) + var zb0001Mask uint8 /* 4 bits */ + if z.BloomFilter == nil { + zb0001Len-- + zb0001Mask |= 0x8 + } + // variable map header, size zb0001Len + o = append(o, 0x80|uint8(zb0001Len)) + if zb0001Len == 0 { + return + } // string "Name" - o = append(o, 0x83, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + o = append(o, 0xa4, 0x4e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.Name) // string "LastUpdate" o = append(o, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65) o = msgp.AppendTime(o, z.LastUpdate) // string "NextCycle" o = append(o, 0xa9, 0x4e, 0x65, 0x78, 0x74, 0x43, 0x79, 0x63, 0x6c, 0x65) - o = msgp.AppendUint8(o, z.NextCycle) + o = msgp.AppendUint32(o, z.NextCycle) + if (zb0001Mask & 0x8) == 0 { // if not empty + // string "BloomFilter" + o = append(o, 0xab, 0x42, 0x6c, 0x6f, 0x6f, 0x6d, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72) + o = msgp.AppendBytes(o, z.BloomFilter) + } return } @@ -136,11 +184,17 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { return } case "NextCycle": - z.NextCycle, bts, err = msgp.ReadUint8Bytes(bts) + z.NextCycle, bts, err = msgp.ReadUint32Bytes(bts) if err != nil { err = msgp.WrapError(err, "NextCycle") return } + case "BloomFilter": + z.BloomFilter, bts, err = msgp.ReadBytesBytes(bts, z.BloomFilter) + if err != nil { + err = msgp.WrapError(err, "BloomFilter") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -154,8 +208,8 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z dataUsageCacheInfo) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 11 + msgp.TimeSize + 10 + msgp.Uint8Size +func (z *dataUsageCacheInfo) Msgsize() (s int) { + s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 11 + msgp.TimeSize + 10 + msgp.Uint32Size + 12 + msgp.BytesPrefixSize + len(z.BloomFilter) return } diff --git a/cmd/data-usage.go b/cmd/data-usage.go index 9b9e95e83..a4c115042 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "context" + "encoding/binary" "encoding/json" "errors" "os" @@ -32,6 +33,7 @@ import ( "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/hash" + "github.com/willf/bloom" ) const ( @@ -45,6 +47,7 @@ const ( dataUsageUpdateDirCycles = 16 dataUsageRoot = SlashSeparator dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix + dataUsageBloomName = ".bloomcycle.bin" dataUsageStartDelay = 5 * time.Minute // Time to wait on startup and between cycles. ) @@ -56,6 +59,20 @@ func initDataUsageStats(ctx context.Context, objAPI ObjectLayer) { } func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) { + // Load current bloom cycle + nextBloomCycle := intDataUpdateTracker.current() + 1 + var buf bytes.Buffer + err := objAPI.GetObject(ctx, dataUsageBucket, dataUsageBloomName, 0, -1, &buf, "", ObjectOptions{}) + if err != nil { + if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) { + logger.LogIf(ctx, err) + } + } else { + if buf.Len() == 8 { + nextBloomCycle = binary.LittleEndian.Uint64(buf.Bytes()) + } + } + for { select { case <-ctx.Done(): @@ -64,9 +81,31 @@ func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) { // Wait before starting next cycle and wait on startup. results := make(chan DataUsageInfo, 1) go storeDataUsageInBackend(ctx, objAPI, results) - err := objAPI.CrawlAndGetDataUsage(ctx, results) + bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle) + logger.LogIf(ctx, err) + err = objAPI.CrawlAndGetDataUsage(ctx, bf, results) close(results) logger.LogIf(ctx, err) + if err == nil { + // Store new cycle... + nextBloomCycle++ + if nextBloomCycle%dataUpdateTrackerResetEvery == 0 { + if intDataUpdateTracker.debug { + logger.Info(color.Green("runDataUsageInfo:") + " Resetting bloom filter for next runs.") + } + nextBloomCycle++ + } + var tmp [8]byte + binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle) + r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)), false) + if err != nil { + logger.LogIf(ctx, err) + continue + } + + _, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageBloomName, NewPutObjReader(r, nil, nil), ObjectOptions{}) + logger.LogIf(ctx, err) + } } } } @@ -130,6 +169,7 @@ type folderScanner struct { getSize getSizeFn oldCache dataUsageCache newCache dataUsageCache + withFilter *bloomFilter waitForLowActiveIO func() dataUsageCrawlMult float64 @@ -164,12 +204,22 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo return nil, ctx.Err() default: } + thisHash := hashPath(folder.name) + if _, ok := f.oldCache.Cache[thisHash]; f.withFilter != nil && ok { + // If folder isn't in filter and we have data, skip it completely. + if folder.name != dataUsageRoot && !f.withFilter.containsDir(folder.name) { + f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent) + if f.dataUsageCrawlDebug { + logger.Info(color.Green("data-usage:")+" Skipping non-updated folder: %v", folder.name) + } + continue + } + } f.waitForLowActiveIO() sleepDuration(dataUsageSleepPerFolder, f.dataUsageCrawlMult) cache := dataUsageEntry{} - thisHash := hashPath(folder.name) err := readDirFn(path.Join(f.root, folder.name), func(entName string, typ os.FileMode) error { // Parse @@ -301,11 +351,14 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai t := UTCNow() dataUsageDebug := env.Get(envDataUsageCrawlDebug, config.EnableOff) == config.EnableOn - defer func() { - if dataUsageDebug { - logger.Info(color.Green("updateUsage")+" Crawl time at %s: %v", basePath, time.Since(t)) - } - }() + logPrefix := color.Green("data-usage: ") + logSuffix := color.Blue(" - %v + %v", basePath, cache.Info.Name) + if dataUsageDebug { + defer func() { + logger.Info(logPrefix+" Crawl time: %v"+logSuffix, time.Since(t)) + }() + + } if cache.Info.Name == "" { cache.Info.Name = dataUsageRoot @@ -329,8 +382,16 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai dataUsageCrawlDebug: dataUsageDebug, } + if len(cache.Info.BloomFilter) > 0 { + s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}} + _, err := s.withFilter.ReadFrom(bytes.NewBuffer(cache.Info.BloomFilter)) + if err != nil { + logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter") + s.withFilter = nil + } + } if s.dataUsageCrawlDebug { - logger.Info(color.Green("runDataUsageInfo:") + " Starting crawler master") + logger.Info(logPrefix+"Start crawling. Bloom filter: %v"+logSuffix, s.withFilter != nil) } done := ctx.Done() @@ -341,14 +402,8 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai flattenLevels-- } - var logPrefix, logSuffix string - if s.dataUsageCrawlDebug { - logPrefix = color.Green("data-usage: ") - logSuffix = color.Blue(" - %v + %v", basePath, cache.Info.Name) - } - if s.dataUsageCrawlDebug { - logger.Info(logPrefix+"Cycle: %v"+logSuffix, cache.Info.NextCycle) + logger.Info(logPrefix+"Cycle: %v, Entries: %v"+logSuffix, cache.Info.NextCycle, len(cache.Cache)) } // Always scan flattenLevels deep. Cache root is level 0. @@ -387,9 +442,10 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai continue } if du == nil { - logger.LogIf(ctx, errors.New("data-usage: no disk usage provided")) + logger.Info(logPrefix + "no disk usage provided" + logSuffix) continue } + s.newCache.replace(folder.name, "", *du) // Add to parent manually if folder.parent != nil { @@ -415,6 +471,17 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai continue } + if s.withFilter != nil { + // If folder isn't in filter, skip it completely. + if !s.withFilter.containsDir(folder.name) { + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Skipping non-updated folder: %v"+logSuffix, folder) + } + s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h]) + continue + } + } + // Update on this cycle... du, err := s.deepScanFolder(ctx, folder.name) if err != nil { @@ -427,7 +494,9 @@ func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, wai } s.newCache.replaceHashed(h, folder.parent, *du) } - + if s.dataUsageCrawlDebug { + logger.Info(logPrefix+"Finished crawl, %v entries"+logSuffix, len(s.newCache.Cache)) + } s.newCache.Info.LastUpdate = UTCNow() s.newCache.Info.NextCycle++ return s.newCache, nil diff --git a/cmd/erasure-encode_test.go b/cmd/erasure-encode_test.go index d8ce6fb2a..8fbe2da70 100644 --- a/cmd/erasure-encode_test.go +++ b/cmd/erasure-encode_test.go @@ -40,6 +40,10 @@ func (a badDisk) ReadFileStream(volume, path string, offset, length int64) (io.R return nil, errFaultyDisk } +func (a badDisk) UpdateBloomFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) { + return nil, errFaultyDisk +} + func (a badDisk) CreateFile(volume, path string, size int64, reader io.Reader) error { return errFaultyDisk } diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 577d0d9cd..0bea09432 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -504,6 +504,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, if _, err := fs.statBucketDir(ctx, bucket); err != nil { return oi, toObjectErr(err, bucket) } + defer ObjectPathUpdated(pathutil.Join(bucket, object)) uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) // Just check if the uploadID exists to avoid copy if it doesn't. diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 9a43eb68e..fc415ffcf 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "context" + "encoding/json" "fmt" "io" "io/ioutil" @@ -41,6 +42,7 @@ import ( "github.com/minio/minio/pkg/bucket/lifecycle" "github.com/minio/minio/pkg/bucket/object/tagging" "github.com/minio/minio/pkg/bucket/policy" + "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/lock" "github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/mimedb" @@ -180,6 +182,7 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { fs.fsFormatRlk = rlk go fs.cleanupStaleMultipartUploads(ctx, GlobalMultipartCleanupInterval, GlobalMultipartExpiry) + go intDataUpdateTracker.start(GlobalContext, fsPath) // Return successfully initialized object layer. return fs, nil @@ -233,7 +236,7 @@ func (fs *FSObjects) waitForLowActiveIO() { } // CrawlAndGetDataUsage returns data usage stats of the current FS deployment -func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error { +func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { // Load bucket totals var oldCache dataUsageCache err := oldCache.load(ctx, fs, dataUsageCacheName) @@ -247,6 +250,15 @@ func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, updates chan<- Da if err != nil { return err } + oldCache.Info.BloomFilter = nil + if bf != nil { + oldCache.Info.BloomFilter = bf.bytes() + } + + if false && intDataUpdateTracker.debug { + b, _ := json.MarshalIndent(bf, "", " ") + logger.Info("Bloom filter: %v", string(b)) + } cache, err := updateUsage(ctx, fs.fsPath, oldCache, fs.waitForLowActiveIO, func(item Item) (int64, error) { // Get file size, symlinks which cannot be // followed are automatically filtered by fastwalk. @@ -256,11 +268,19 @@ func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, updates chan<- Da } return fi.Size(), nil }) + cache.Info.BloomFilter = nil // Even if there was an error, the new cache may have better info. if cache.Info.LastUpdate.After(oldCache.Info.LastUpdate) { + if intDataUpdateTracker.debug { + logger.Info(color.Green("CrawlAndGetDataUsage:")+" Saving cache with %d entries", len(cache.Cache)) + } logger.LogIf(ctx, cache.save(ctx, fs, dataUsageCacheName)) updates <- cache.dui(dataUsageRoot, buckets) + } else { + if intDataUpdateTracker.debug { + logger.Info(color.Green("CrawlAndGetDataUsage:")+" Cache not updated, %d entries", len(cache.Cache)) + } } return err @@ -452,6 +472,8 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelet // update metadata. func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) { cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) + defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) + if !cpSrcDstSame { objectDWLock := fs.NewNSLock(ctx, dstBucket, dstObject) if err := objectDWLock.GetLock(globalObjectTimeout); err != nil { @@ -871,6 +893,7 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string if err := checkPutObjectArgs(ctx, bucket, object, fs, r.Size()); err != nil { return ObjectInfo{}, err } + // Lock the object. objectLock := fs.NewNSLock(ctx, bucket, object) if err := objectLock.GetLock(globalObjectTimeout); err != nil { @@ -878,6 +901,7 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string return objInfo, err } defer objectLock.Unlock() + defer ObjectPathUpdated(path.Join(bucket, object)) atomic.AddInt64(&fs.activeIOCount, 1) defer func() { @@ -1036,6 +1060,8 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) er return err } + defer ObjectPathUpdated(path.Join(bucket, object)) + atomic.AddInt64(&fs.activeIOCount, 1) defer func() { atomic.AddInt64(&fs.activeIOCount, -1) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 2d61c36b7..0f004d70f 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -50,7 +50,7 @@ func NewGatewayLayerWithLocker(gwLayer ObjectLayer) ObjectLayer { type GatewayUnsupported struct{} // CrawlAndGetDataUsage - crawl is not implemented for gateway -func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error { +func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { logger.CriticalIf(ctx, errors.New("not implemented")) return NotImplemented{} } diff --git a/cmd/notification.go b/cmd/notification.go index d99884931..fd177bc07 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "context" + "encoding/json" "encoding/xml" "fmt" "io" @@ -30,19 +31,18 @@ import ( "time" "github.com/klauspost/compress/zip" + "github.com/minio/minio-go/v6/pkg/set" "github.com/minio/minio/cmd/crypto" "github.com/minio/minio/cmd/logger" - bucketsse "github.com/minio/minio/pkg/bucket/encryption" "github.com/minio/minio/pkg/bucket/lifecycle" objectlock "github.com/minio/minio/pkg/bucket/object/lock" "github.com/minio/minio/pkg/bucket/policy" - - "github.com/minio/minio-go/v6/pkg/set" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/madmin" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/sync/errgroup" + "github.com/willf/bloom" ) // NotificationSys - notification system. @@ -435,6 +435,75 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE return ng.Wait() } +// updateBloomFilter will cycle all servers to the current index and +// return a merged bloom filter if a complete one can be retrieved. +func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint64) (*bloomFilter, error) { + var req = bloomFilterRequest{ + Current: current, + Oldest: current - dataUsageUpdateDirCycles, + } + if current < dataUsageUpdateDirCycles { + req.Oldest = 0 + } + + // Load initial state from local... + var bf *bloomFilter + bfr, err := intDataUpdateTracker.cycleFilter(ctx, req.Oldest, req.Current) + logger.LogIf(ctx, err) + if err == nil && bfr.Complete { + nbf := intDataUpdateTracker.newBloomFilter() + bf = &nbf + _, err = bf.ReadFrom(bytes.NewBuffer(bfr.Filter)) + logger.LogIf(ctx, err) + } + + var mu sync.Mutex + g := errgroup.WithNErrs(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + g.Go(func() error { + serverBF, err := client.cycleServerBloomFilter(ctx, req) + if false && intDataUpdateTracker.debug { + b, _ := json.MarshalIndent(serverBF, "", " ") + logger.Info("Disk %v, Bloom filter: %v", client.host.Name, string(b)) + } + // Keep lock while checking result. + mu.Lock() + defer mu.Unlock() + + if err != nil || !serverBF.Complete || bf == nil { + logger.LogIf(ctx, err) + bf = nil + return nil + } + + var tmp bloom.BloomFilter + _, err = tmp.ReadFrom(bytes.NewBuffer(serverBF.Filter)) + if err != nil { + logger.LogIf(ctx, err) + bf = nil + return nil + } + if bf.BloomFilter == nil { + bf.BloomFilter = &tmp + } else { + err = bf.Merge(&tmp) + if err != nil { + logger.LogIf(ctx, err) + bf = nil + return nil + } + } + return nil + }, idx) + } + g.Wait() + return bf, nil +} + // GetLocks - makes GetLocks RPC call on all peers. func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { locksResp := make([]*PeerLocks, len(sys.peerClients)) diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 34f859eb2..a3a0111d6 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -60,7 +60,7 @@ type ObjectLayer interface { // Storage operations. Shutdown(context.Context) error - CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error + CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error StorageInfo(ctx context.Context, local bool) StorageInfo // local queries only local disks // Bucket operations. diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 18275222c..2bc9fad26 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -524,6 +524,25 @@ func (client *peerRESTClient) RemoveBucketObjectLockConfig(bucket string) error return nil } +// cycleServerBloomFilter will cycle the bloom filter to start recording to index y if not already. +// The response will contain a bloom filter starting at index x up to, but not including index y. +// If y is 0, the response will not update y, but return the currently recorded information +// from the current x to y-1. +func (client *peerRESTClient) cycleServerBloomFilter(ctx context.Context, req bloomFilterRequest) (*bloomFilterResponse, error) { + var reader bytes.Buffer + err := gob.NewEncoder(&reader).Encode(req) + if err != nil { + return nil, err + } + respBody, err := client.call(peerRESTMethodCycleBloom, nil, &reader, -1) + if err != nil { + return nil, err + } + var resp bloomFilterResponse + defer http.DrainBody(respBody) + return &resp, gob.NewDecoder(respBody).Decode(&resp) +} + // SetBucketPolicy - Set bucket policy on the peer node. func (client *peerRESTClient) SetBucketPolicy(bucket string, bucketPolicy *policy.Policy) error { values := make(url.Values) diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index e1a66c538..ad5c6a659 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -53,6 +53,7 @@ const ( peerRESTMethodBucketPolicySet = "/setbucketpolicy" peerRESTMethodBucketNotificationPut = "/putbucketnotification" peerRESTMethodReloadFormat = "/reloadformat" + peerRESTMethodCycleBloom = "/cyclebloom" peerRESTMethodTrace = "/trace" peerRESTMethodListen = "/listen" peerRESTMethodLog = "/log" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 3d06afa61..0cadb4905 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -773,6 +773,30 @@ func (s *peerRESTServer) SetBucketSSEConfigHandler(w http.ResponseWriter, r *htt w.(http.Flusher).Flush() } +// CycleServerBloomFilterHandler cycles bllom filter on server. +func (s *peerRESTServer) CycleServerBloomFilterHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + ctx := newContext(r, w, "CycleServerBloomFilter") + + var req bloomFilterRequest + err := gob.NewDecoder(r.Body).Decode(&req) + if err != nil { + s.writeErrorResponse(w, err) + return + } + bf, err := intDataUpdateTracker.cycleFilter(ctx, req.Oldest, req.Current) + if err != nil { + s.writeErrorResponse(w, err) + return + } + logger.LogIf(ctx, gob.NewEncoder(w).Encode(bf)) + w.(http.Flusher).Flush() +} + // PutBucketNotificationHandler - Set bucket policy. func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1124,6 +1148,7 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDriveOBDInfo).HandlerFunc(httpTraceHdrs(server.DriveOBDInfoHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetOBDInfo).HandlerFunc(httpTraceHdrs(server.NetOBDInfoHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDispatchNetOBDInfo).HandlerFunc(httpTraceHdrs(server.DispatchNetOBDInfoHandler)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucket).HandlerFunc(httpTraceHdrs(server.DeleteBucketHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSignalService).HandlerFunc(httpTraceHdrs(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerUpdate).HandlerFunc(httpTraceHdrs(server.ServerUpdateHandler)).Queries(restQueries(peerRESTUpdateURL, peerRESTSha256Hex, peerRESTLatestRelease)...) diff --git a/cmd/server-main.go b/cmd/server-main.go index c19adb0e4..2278acf92 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -491,7 +491,6 @@ func serverMain(ctx *cli.Context) { // Initialize object layer with the supplied disks, objectLayer is nil upon any error. func newObjectLayer(ctx context.Context, endpointZones EndpointZones) (newObject ObjectLayer, err error) { // For FS only, directly use the disk. - if endpointZones.NEndpoints() == 1 { // Initialize new FS object layer. return NewFSObjectLayer(endpointZones[0].Endpoints[0].Path) diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 3281a16ec..d66e83342 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -473,7 +473,8 @@ func (s *xlSets) StorageInfo(ctx context.Context, local bool) StorageInfo { return storageInfo } -func (s *xlSets) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error { +func (s *xlSets) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { + // Use the zone-level implementation instead. return NotImplemented{} } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index dc3a04ca3..a5bcd50cf 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -42,6 +42,9 @@ func (xl xlObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealRes // `policy.json, notification.xml, listeners.json`. func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ( result madmin.HealResultItem, err error) { + if !dryRun { + defer ObjectPathUpdated(bucket) + } storageDisks := xl.getDisks() diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 7fb5ce4b4..cb7e3761a 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -545,6 +545,8 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, return oi, toObjectErr(errFileParentIsFile, bucket, object) } + defer ObjectPathUpdated(path.Join(bucket, object)) + // Calculate s3 compatible md5sum for complete multipart. s3MD5 := getCompleteMultipartMD5(parts) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 526aa730a..98627e936 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -69,6 +69,8 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc // Check if this request is only metadata update. if cpSrcDstSame { + defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) + // Read metadata associated with the object from all disks. storageDisks := xl.getDisks() @@ -481,6 +483,7 @@ func (xl xlObjects) PutObject(ctx context.Context, bucket string, object string, // putObject wrapper for xl PutObject func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { + defer ObjectPathUpdated(path.Join(bucket, object)) data := r.Reader uniqueID := mustGetUUID() @@ -695,6 +698,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, writeQuorum int, isDir bool) error { var disks []StorageAPI var err error + defer ObjectPathUpdated(path.Join(bucket, object)) tmpObj := mustGetUUID() if bucket == minioMetaTmpBucket { @@ -755,6 +759,7 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects if errs[idx] != nil { continue } + tmpObjs[idx] = mustGetUUID() var err error // Rename the current object while requiring @@ -774,6 +779,7 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects if err != nil { errs[idx] = err } + ObjectPathUpdated(path.Join(bucket, objects[idx])) } } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 7cbe0014b..c272d10e1 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "errors" "fmt" "sort" "sync" @@ -25,6 +26,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" + "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/sync/errgroup" @@ -200,24 +202,14 @@ func (xl xlObjects) GetMetrics(ctx context.Context) (*Metrics, error) { // CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed. // Updates are sent on a regular basis and the caller *must* consume them. -func (xl xlObjects) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error { - cache := make(chan dataUsageCache, 1) - defer close(cache) - buckets, err := xl.ListBuckets(ctx) - if err != nil { - return err - } - go func() { - for update := range cache { - updates <- update.dui(update.Info.Name, buckets) - } - }() - return xl.crawlAndGetDataUsage(ctx, buckets, cache) +func (xl xlObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { + // This should only be called from runDataUsageInfo and this setup should not happen (zones). + return errors.New("xlObjects CrawlAndGetDataUsage not implemented") } // CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed. // Updates are sent on a regular basis and the caller *must* consume them. -func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, updates chan<- dataUsageCache) error { +func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, updates chan<- dataUsageCache) error { var disks []StorageAPI for _, d := range xl.getLoadBalancedDisks() { @@ -258,8 +250,14 @@ func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketIn for _, b := range buckets { e := oldCache.find(b.Name) if e != nil { - bucketCh <- b - cache.replace(b.Name, dataUsageRoot, *e) + if bf == nil || bf.containsDir(b.Name) { + bucketCh <- b + cache.replace(b.Name, dataUsageRoot, *e) + } else { + if intDataUpdateTracker.debug { + logger.Info(color.Green("crawlAndGetDataUsage:")+" Skipping bucket %v, not updated", b.Name) + } + } } } @@ -303,6 +301,9 @@ func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketIn cache.Info.NextCycle++ cache.Info.LastUpdate = time.Now() logger.LogIf(ctx, cache.save(ctx, xl, dataUsageCacheName)) + if intDataUpdateTracker.debug { + logger.Info(color.Green("crawlAndGetDataUsage:")+" Cache saved, Next Cycle: %d", cache.Info.NextCycle) + } updates <- cache }() @@ -339,7 +340,11 @@ func (xl xlObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketIn // Calc usage before := cache.Info.LastUpdate + if bf != nil { + cache.Info.BloomFilter = bf.bytes() + } cache, err = disk.CrawlAndGetDataUsage(ctx, cache) + cache.Info.BloomFilter = nil if err != nil { logger.LogIf(ctx, err) if cache.Info.LastUpdate.After(before) { diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index 18583e7a2..5c54df54d 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -64,8 +64,16 @@ func newXLZones(ctx context.Context, endpointZones EndpointZones) (ObjectLayer, storageDisks = make([][]StorageAPI, len(endpointZones)) z = &xlZones{zones: make([]*xlSets, len(endpointZones))} ) + + var localDrives []string + local := endpointZones.FirstLocal() for i, ep := range endpointZones { + for _, endpoint := range ep.Endpoints { + if endpoint.IsLocal { + localDrives = append(localDrives, endpoint.Path) + } + } storageDisks[i], formats[i], err = waitForFormatXL(local, ep.Endpoints, i+1, ep.SetCount, ep.DrivesPerSet, deploymentID) if err != nil { @@ -82,6 +90,8 @@ func newXLZones(ctx context.Context, endpointZones EndpointZones) (ObjectLayer, if !z.SingleZone() { z.quickHealBuckets(ctx) } + go intDataUpdateTracker.start(GlobalContext, localDrives...) + return z, nil } @@ -217,7 +227,7 @@ func (z *xlZones) StorageInfo(ctx context.Context, local bool) StorageInfo { return storageInfo } -func (z *xlZones) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error { +func (z *xlZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { ctx, cancel := context.WithCancel(ctx) defer cancel() var wg sync.WaitGroup @@ -257,7 +267,7 @@ func (z *xlZones) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataU } }() // Start crawler. Blocks until done. - err := xl.crawlAndGetDataUsage(ctx, buckets, updates) + err := xl.crawlAndGetDataUsage(ctx, buckets, bf, updates) if err != nil { mu.Lock() if firstErr == nil { diff --git a/go.mod b/go.mod index 270dda83f..ce7771def 100644 --- a/go.mod +++ b/go.mod @@ -99,11 +99,14 @@ require ( github.com/sirupsen/logrus v1.5.0 github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3 // indirect github.com/soheilhy/cmux v0.1.4 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 github.com/tinylib/msgp v1.1.1 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/ugorji/go v1.1.5-pre // indirect github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a + github.com/willf/bitset v1.1.10 // indirect + github.com/willf/bloom v2.0.3+incompatible github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.3.3 // indirect diff --git a/go.sum b/go.sum index c15d82c95..6a4366554 100644 --- a/go.sum +++ b/go.sum @@ -398,6 +398,8 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2 github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -424,6 +426,10 @@ github.com/ugorji/go/codec v1.1.5-pre h1:5YV9PsFAN+ndcCtTM7s60no7nY7eTG3LPtxhSwu github.com/ugorji/go/codec v1.1.5-pre/go.mod h1:tULtS6Gy1AE1yCENaw4Vb//HLH5njI2tfCQDUqRd8fI= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc= +github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= +github.com/willf/bloom v2.0.3+incompatible h1:QDacWdqcAUI1MPOwIQZRy9kOR7yxfyEmxX8Wdm2/JPA= +github.com/willf/bloom v2.0.3+incompatible/go.mod h1:MmAltL9pDMNTrvUkxdg0k0q5I0suxmuwp3KbyrZLOZ8= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=