add data update tracking using bloom filter (#9208)
By monitoring PUT/DELETE and heal operations it is possible to track changed paths and keep a bloom filter for this data. This can help prioritize paths to scan. The bloom filter can identify paths that have not changed, and the few collisions will only result in a marginal extra workload. This can be implemented on either a bucket+(1 prefix level) with reasonable performance. The bloom filter is set to have a false positive rate at 1% at 1M entries. A bloom table of this size is about ~2500 bytes when serialized. To not force a full scan of all paths that have changed cycle bloom filters would need to be kept, so we guarantee that dirty paths have been scanned within cycle runs. Until cycle bloom filters have been collected all paths are considered dirty.master
parent
eff4127efd
commit
073aac3d92
@ -0,0 +1,607 @@ |
||||
/* |
||||
* MinIO Cloud Storage, (C) 2020 MinIO, Inc. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package cmd |
||||
|
||||
import ( |
||||
"bufio" |
||||
"bytes" |
||||
"context" |
||||
"encoding/binary" |
||||
"errors" |
||||
"io" |
||||
"io/ioutil" |
||||
"os" |
||||
"path" |
||||
"sort" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/minio/minio/cmd/config" |
||||
"github.com/minio/minio/cmd/logger" |
||||
"github.com/minio/minio/pkg/color" |
||||
"github.com/minio/minio/pkg/env" |
||||
"github.com/willf/bloom" |
||||
) |
||||
|
||||
const ( |
||||
// Estimate bloom filter size. With this many items
|
||||
dataUpdateTrackerEstItems = 1000000 |
||||
// ... we want this false positive rate:
|
||||
dataUpdateTrackerFP = 0.99 |
||||
dataUpdateTrackerQueueSize = 10000 |
||||
|
||||
dataUpdateTrackerVersion = 1 |
||||
dataUpdateTrackerFilename = minioMetaBucket + SlashSeparator + bucketMetaPrefix + SlashSeparator + ".tracker.bin" |
||||
dataUpdateTrackerSaveInterval = 5 * time.Minute |
||||
|
||||
// Reset bloom filters every n cycle
|
||||
dataUpdateTrackerResetEvery = 1000 |
||||
) |
||||
|
||||
var ( |
||||
objectUpdatedCh chan<- string |
||||
intDataUpdateTracker *dataUpdateTracker |
||||
) |
||||
|
||||
func init() { |
||||
intDataUpdateTracker = newDataUpdateTracker() |
||||
objectUpdatedCh = intDataUpdateTracker.input |
||||
} |
||||
|
||||
type dataUpdateTracker struct { |
||||
mu sync.Mutex |
||||
input chan string |
||||
save chan struct{} |
||||
debug bool |
||||
saveExited chan struct{} |
||||
|
||||
Current dataUpdateFilter |
||||
History dataUpdateTrackerHistory |
||||
Saved time.Time |
||||
} |
||||
|
||||
// newDataUpdateTracker returns a dataUpdateTracker with default settings.
|
||||
func newDataUpdateTracker() *dataUpdateTracker { |
||||
d := &dataUpdateTracker{ |
||||
Current: dataUpdateFilter{ |
||||
idx: 1, |
||||
}, |
||||
debug: env.Get(envDataUsageCrawlDebug, config.EnableOff) == config.EnableOn, |
||||
input: make(chan string, dataUpdateTrackerQueueSize), |
||||
save: make(chan struct{}, 1), |
||||
saveExited: make(chan struct{}), |
||||
} |
||||
d.Current.bf = d.newBloomFilter() |
||||
return d |
||||
} |
||||
|
||||
type dataUpdateTrackerHistory []dataUpdateFilter |
||||
|
||||
type dataUpdateFilter struct { |
||||
idx uint64 |
||||
bf bloomFilter |
||||
} |
||||
|
||||
type bloomFilter struct { |
||||
*bloom.BloomFilter |
||||
} |
||||
|
||||
// emptyBloomFilter returns an empty bloom filter.
|
||||
func emptyBloomFilter() bloomFilter { |
||||
return bloomFilter{BloomFilter: &bloom.BloomFilter{}} |
||||
} |
||||
|
||||
// containsDir returns whether the bloom filter contains a directory.
|
||||
// Note that objects in XL mode are also considered directories.
|
||||
func (b bloomFilter) containsDir(in string) bool { |
||||
split := splitPathDeterministic(path.Clean(in)) |
||||
|
||||
if len(split) == 0 { |
||||
return false |
||||
} |
||||
var tmp [dataUsageHashLen]byte |
||||
hashPath(path.Join(split...)).bytes(tmp[:]) |
||||
return b.Test(tmp[:]) |
||||
} |
||||
|
||||
// bytes returns the bloom filter serialized as a byte slice.
|
||||
func (b bloomFilter) bytes() []byte { |
||||
if b.BloomFilter == nil { |
||||
return nil |
||||
} |
||||
var buf bytes.Buffer |
||||
_, err := b.WriteTo(&buf) |
||||
if err != nil { |
||||
logger.LogIf(GlobalContext, err) |
||||
return nil |
||||
} |
||||
return buf.Bytes() |
||||
} |
||||
|
||||
// sort the dataUpdateTrackerHistory, newest first.
|
||||
// Returns whether the history is complete.
|
||||
func (d dataUpdateTrackerHistory) sort() bool { |
||||
if len(d) == 0 { |
||||
return true |
||||
} |
||||
sort.Slice(d, func(i, j int) bool { |
||||
return d[i].idx > d[j].idx |
||||
}) |
||||
return d[0].idx-d[len(d)-1].idx == uint64(len(d)) |
||||
} |
||||
|
||||
// removeOlderThan will remove entries older than index 'n'.
|
||||
func (d *dataUpdateTrackerHistory) removeOlderThan(n uint64) { |
||||
d.sort() |
||||
dd := *d |
||||
end := len(dd) |
||||
for i := end - 1; i >= 0; i-- { |
||||
if dd[i].idx < n { |
||||
end = i |
||||
} |
||||
} |
||||
dd = dd[:end] |
||||
*d = dd |
||||
} |
||||
|
||||
// newBloomFilter returns a new bloom filter with default settings.
|
||||
func (d *dataUpdateTracker) newBloomFilter() bloomFilter { |
||||
return bloomFilter{bloom.NewWithEstimates(dataUpdateTrackerEstItems, dataUpdateTrackerFP)} |
||||
} |
||||
|
||||
// current returns the current index.
|
||||
func (d *dataUpdateTracker) current() uint64 { |
||||
d.mu.Lock() |
||||
defer d.mu.Unlock() |
||||
return d.Current.idx |
||||
} |
||||
|
||||
// start will load the current data from the drives start collecting information and
|
||||
// start a saver goroutine.
|
||||
// All of these will exit when the context is canceled.
|
||||
func (d *dataUpdateTracker) start(ctx context.Context, drives ...string) { |
||||
if len(drives) <= 0 { |
||||
logger.LogIf(ctx, errors.New("dataUpdateTracker.start: No drives specified")) |
||||
return |
||||
} |
||||
d.load(ctx, drives...) |
||||
go d.startCollector(ctx) |
||||
go d.startSaver(ctx, dataUpdateTrackerSaveInterval, drives) |
||||
} |
||||
|
||||
// load will attempt to load data tracking information from the supplied drives.
|
||||
// The data will only be loaded if d.Saved is older than the one found on disk.
|
||||
// The newest working cache will be kept in d.
|
||||
// If no valid data usage tracker can be found d will remain unchanged.
|
||||
// If object is shared the caller should lock it.
|
||||
func (d *dataUpdateTracker) load(ctx context.Context, drives ...string) { |
||||
if len(drives) <= 0 { |
||||
logger.LogIf(ctx, errors.New("dataUpdateTracker.load: No drives specified")) |
||||
return |
||||
} |
||||
for _, drive := range drives { |
||||
cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename) |
||||
f, err := os.Open(cacheFormatPath) |
||||
if err != nil { |
||||
if os.IsNotExist(err) { |
||||
continue |
||||
} |
||||
logger.LogIf(ctx, err) |
||||
continue |
||||
} |
||||
err = d.deserialize(f, d.Saved) |
||||
if err != nil { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
f.Close() |
||||
} |
||||
} |
||||
|
||||
// startSaver will start a saver that will write d to all supplied drives at specific intervals.
|
||||
// The saver will save and exit when supplied context is closed.
|
||||
func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Duration, drives []string) { |
||||
t := time.NewTicker(interval) |
||||
defer t.Stop() |
||||
var buf bytes.Buffer |
||||
d.mu.Lock() |
||||
saveNow := d.save |
||||
exited := make(chan struct{}) |
||||
d.saveExited = exited |
||||
d.mu.Unlock() |
||||
defer close(exited) |
||||
for { |
||||
var exit bool |
||||
select { |
||||
case <-ctx.Done(): |
||||
exit = true |
||||
case <-t.C: |
||||
case <-saveNow: |
||||
} |
||||
buf.Reset() |
||||
d.mu.Lock() |
||||
d.Saved = UTCNow() |
||||
err := d.serialize(&buf) |
||||
if d.debug { |
||||
logger.Info(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v", buf.Len(), d.Current.idx) |
||||
} |
||||
d.mu.Unlock() |
||||
if err != nil { |
||||
logger.LogIf(ctx, err, "Error serializing usage tracker data") |
||||
if exit { |
||||
return |
||||
} |
||||
continue |
||||
} |
||||
if buf.Len() == 0 { |
||||
logger.LogIf(ctx, errors.New("zero sized output, skipping save")) |
||||
continue |
||||
} |
||||
for _, drive := range drives { |
||||
cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename) |
||||
err := ioutil.WriteFile(cacheFormatPath, buf.Bytes(), os.ModePerm) |
||||
if err != nil { |
||||
logger.LogIf(ctx, err) |
||||
continue |
||||
} |
||||
} |
||||
if exit { |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
// serialize all data in d to dst.
|
||||
// Caller should hold lock if d is expected to be shared.
|
||||
// If an error is returned, there will likely be partial data written to dst.
|
||||
func (d *dataUpdateTracker) serialize(dst io.Writer) (err error) { |
||||
ctx := GlobalContext |
||||
var tmp [8]byte |
||||
o := bufio.NewWriter(dst) |
||||
defer func() { |
||||
if err == nil { |
||||
err = o.Flush() |
||||
} |
||||
}() |
||||
|
||||
// Version
|
||||
if err := o.WriteByte(dataUpdateTrackerVersion); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
// Timestamp.
|
||||
binary.LittleEndian.PutUint64(tmp[:], uint64(d.Saved.Unix())) |
||||
if _, err := o.Write(tmp[:]); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
// Current
|
||||
binary.LittleEndian.PutUint64(tmp[:], d.Current.idx) |
||||
if _, err := o.Write(tmp[:]); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
if _, err := d.Current.bf.WriteTo(o); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
// History
|
||||
binary.LittleEndian.PutUint64(tmp[:], uint64(len(d.History))) |
||||
if _, err := o.Write(tmp[:]); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
for _, bf := range d.History { |
||||
// Current
|
||||
binary.LittleEndian.PutUint64(tmp[:], bf.idx) |
||||
if _, err := o.Write(tmp[:]); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
if _, err := bf.bf.WriteTo(o); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// deserialize will deserialize the supplied input if the input is newer than the supplied time.
|
||||
func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) error { |
||||
ctx := GlobalContext |
||||
var dst dataUpdateTracker |
||||
var tmp [8]byte |
||||
|
||||
// Version
|
||||
if _, err := io.ReadFull(src, tmp[:1]); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
switch tmp[0] { |
||||
case dataUpdateTrackerVersion: |
||||
default: |
||||
return errors.New("dataUpdateTracker: Unknown data version") |
||||
} |
||||
// Timestamp.
|
||||
if _, err := io.ReadFull(src, tmp[:8]); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
t := time.Unix(int64(binary.LittleEndian.Uint64(tmp[:])), 0) |
||||
if !t.After(newerThan) { |
||||
return nil |
||||
} |
||||
|
||||
// Current
|
||||
if _, err := io.ReadFull(src, tmp[:8]); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
dst.Current.idx = binary.LittleEndian.Uint64(tmp[:]) |
||||
dst.Current.bf = emptyBloomFilter() |
||||
if _, err := dst.Current.bf.ReadFrom(src); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
// History
|
||||
if _, err := io.ReadFull(src, tmp[:8]); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
n := binary.LittleEndian.Uint64(tmp[:]) |
||||
dst.History = make(dataUpdateTrackerHistory, int(n)) |
||||
for i, e := range dst.History { |
||||
if _, err := io.ReadFull(src, tmp[:8]); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
e.idx = binary.LittleEndian.Uint64(tmp[:]) |
||||
e.bf = emptyBloomFilter() |
||||
if _, err := e.bf.ReadFrom(src); err != nil { |
||||
if d.debug { |
||||
logger.LogIf(ctx, err) |
||||
} |
||||
return err |
||||
} |
||||
dst.History[i] = e |
||||
} |
||||
// Ignore what remains on the stream.
|
||||
// Update d:
|
||||
d.Current = dst.Current |
||||
d.History = dst.History |
||||
d.Saved = dst.Saved |
||||
return nil |
||||
} |
||||
|
||||
// start a collector that picks up entries from objectUpdatedCh
|
||||
// and adds them to the current bloom filter.
|
||||
func (d *dataUpdateTracker) startCollector(ctx context.Context) { |
||||
var tmp [dataUsageHashLen]byte |
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case in := <-d.input: |
||||
bucket, _ := path2BucketObjectWithBasePath("", in) |
||||
if bucket == "" { |
||||
if d.debug && len(in) > 0 { |
||||
logger.Info(color.Green("data-usage:")+" no bucket (%s)", in) |
||||
} |
||||
continue |
||||
} |
||||
|
||||
if isReservedOrInvalidBucket(bucket, false) { |
||||
if false && d.debug { |
||||
logger.Info(color.Green("data-usage:")+" isReservedOrInvalidBucket: %v, entry: %v", bucket, in) |
||||
} |
||||
continue |
||||
} |
||||
split := splitPathDeterministic(in) |
||||
|
||||
// Add all paths until level 3.
|
||||
d.mu.Lock() |
||||
for i := range split { |
||||
if d.debug && false { |
||||
logger.Info(color.Green("dataUpdateTracker:") + " Marking path dirty: " + color.Blue(path.Join(split[:i+1]...))) |
||||
} |
||||
hashPath(path.Join(split[:i+1]...)).bytes(tmp[:]) |
||||
d.Current.bf.Add(tmp[:]) |
||||
} |
||||
d.mu.Unlock() |
||||
} |
||||
} |
||||
} |
||||
|
||||
// find entry with specified index.
|
||||
// Returns nil if not found.
|
||||
func (d dataUpdateTrackerHistory) find(idx uint64) *dataUpdateFilter { |
||||
for _, f := range d { |
||||
if f.idx == idx { |
||||
return &f |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// filterFrom will return a combined bloom filter.
|
||||
func (d *dataUpdateTracker) filterFrom(ctx context.Context, oldest, newest uint64) *bloomFilterResponse { |
||||
bf := d.newBloomFilter() |
||||
bfr := bloomFilterResponse{ |
||||
OldestIdx: oldest, |
||||
CurrentIdx: d.Current.idx, |
||||
Complete: true, |
||||
} |
||||
// Loop through each index requested.
|
||||
for idx := oldest; idx <= newest; idx++ { |
||||
v := d.History.find(idx) |
||||
if v == nil { |
||||
if d.Current.idx == idx { |
||||
// Merge current.
|
||||
err := bf.Merge(d.Current.bf.BloomFilter) |
||||
logger.LogIf(ctx, err) |
||||
if err != nil { |
||||
bfr.Complete = false |
||||
} |
||||
continue |
||||
} |
||||
bfr.Complete = false |
||||
bfr.OldestIdx = idx + 1 |
||||
continue |
||||
} |
||||
|
||||
err := bf.Merge(v.bf.BloomFilter) |
||||
if err != nil { |
||||
bfr.Complete = false |
||||
logger.LogIf(ctx, err) |
||||
continue |
||||
} |
||||
bfr.NewestIdx = idx |
||||
} |
||||
var dst bytes.Buffer |
||||
_, err := bf.WriteTo(&dst) |
||||
if err != nil { |
||||
logger.LogIf(ctx, err) |
||||
return nil |
||||
} |
||||
bfr.Filter = dst.Bytes() |
||||
|
||||
return &bfr |
||||
} |
||||
|
||||
// cycleFilter will cycle the bloom filter to start recording to index y if not already.
|
||||
// The response will contain a bloom filter starting at index x up to, but not including index y.
|
||||
// If y is 0, the response will not update y, but return the currently recorded information
|
||||
// from the up until and including current y.
|
||||
func (d *dataUpdateTracker) cycleFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) { |
||||
d.mu.Lock() |
||||
defer d.mu.Unlock() |
||||
|
||||
if current == 0 { |
||||
if len(d.History) == 0 { |
||||
return d.filterFrom(ctx, d.Current.idx, d.Current.idx), nil |
||||
} |
||||
d.History.sort() |
||||
return d.filterFrom(ctx, d.History[len(d.History)-1].idx, d.Current.idx), nil |
||||
} |
||||
|
||||
// Move current to history if new one requested
|
||||
if d.Current.idx != current { |
||||
if d.debug { |
||||
logger.Info(color.Green("dataUpdateTracker:")+" cycle bloom filter: %v -> %v", d.Current.idx, current) |
||||
} |
||||
|
||||
d.History = append(d.History, d.Current) |
||||
d.Current.idx = current |
||||
d.Current.bf = d.newBloomFilter() |
||||
select { |
||||
case d.save <- struct{}{}: |
||||
default: |
||||
} |
||||
} |
||||
d.History.removeOlderThan(oldest) |
||||
return d.filterFrom(ctx, oldest, current), nil |
||||
} |
||||
|
||||
// splitPathDeterministic will split the provided relative path
|
||||
// deterministically and return up to the first 3 elements of the path.
|
||||
// Slash and dot prefixes are removed.
|
||||
// Trailing slashes are removed.
|
||||
// Returns 0 length if no parts are found after trimming.
|
||||
func splitPathDeterministic(in string) []string { |
||||
split := strings.Split(in, SlashSeparator) |
||||
|
||||
// Trim empty start/end
|
||||
for len(split) > 0 { |
||||
if len(split[0]) > 0 && split[0] != "." { |
||||
break |
||||
} |
||||
split = split[1:] |
||||
} |
||||
for len(split) > 0 { |
||||
if len(split[len(split)-1]) > 0 { |
||||
break |
||||
} |
||||
split = split[:len(split)-1] |
||||
} |
||||
|
||||
// Return up to 3 parts.
|
||||
if len(split) > 3 { |
||||
split = split[:3] |
||||
} |
||||
return split |
||||
} |
||||
|
||||
// bloomFilterRequest request bloom filters.
|
||||
// Current index will be updated to current and entries back to Oldest is returned.
|
||||
type bloomFilterRequest struct { |
||||
Oldest uint64 |
||||
Current uint64 |
||||
} |
||||
|
||||
type bloomFilterResponse struct { |
||||
// Current index being written to.
|
||||
CurrentIdx uint64 |
||||
// Oldest index in the returned bloom filter.
|
||||
OldestIdx uint64 |
||||
// Newest Index in the returned bloom filter.
|
||||
NewestIdx uint64 |
||||
// Are all indexes between oldest and newest filled?
|
||||
Complete bool |
||||
// Binary data of the bloom filter.
|
||||
Filter []byte |
||||
} |
||||
|
||||
// ObjectPathUpdated indicates a path has been updated.
|
||||
// The function will never block.
|
||||
func ObjectPathUpdated(s string) { |
||||
select { |
||||
case objectUpdatedCh <- s: |
||||
default: |
||||
} |
||||
} |
@ -0,0 +1,262 @@ |
||||
/* |
||||
* MinIO Cloud Storage, (C) 2020 MinIO, Inc. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package cmd |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"fmt" |
||||
"io/ioutil" |
||||
"math/rand" |
||||
"os" |
||||
"path" |
||||
"path/filepath" |
||||
"sync" |
||||
"testing" |
||||
|
||||
"github.com/minio/minio/cmd/logger" |
||||
"github.com/minio/minio/cmd/logger/message/log" |
||||
) |
||||
|
||||
type testLoggerI interface { |
||||
Helper() |
||||
Log(args ...interface{}) |
||||
} |
||||
|
||||
type testingLogger struct { |
||||
mu sync.Mutex |
||||
t testLoggerI |
||||
} |
||||
|
||||
func (t *testingLogger) Send(entry interface{}, errKind string) error { |
||||
t.mu.Lock() |
||||
defer t.mu.Unlock() |
||||
if t.t == nil { |
||||
return nil |
||||
} |
||||
e, ok := entry.(log.Entry) |
||||
if !ok { |
||||
return fmt.Errorf("unexpected log entry structure %#v", entry) |
||||
} |
||||
|
||||
t.t.Helper() |
||||
t.t.Log(e.Level, ":", errKind, e.Message) |
||||
return nil |
||||
} |
||||
|
||||
func addTestingLogging(t testLoggerI) func() { |
||||
tl := &testingLogger{t: t} |
||||
logger.AddTarget(tl) |
||||
return func() { |
||||
tl.mu.Lock() |
||||
defer tl.mu.Unlock() |
||||
tl.t = nil |
||||
} |
||||
} |
||||
|
||||
func TestDataUpdateTracker(t *testing.T) { |
||||
dut := newDataUpdateTracker() |
||||
// Change some defaults.
|
||||
dut.debug = testing.Verbose() |
||||
dut.input = make(chan string) |
||||
dut.save = make(chan struct{}) |
||||
|
||||
defer addTestingLogging(t)() |
||||
|
||||
dut.Current.bf = dut.newBloomFilter() |
||||
|
||||
tmpDir, err := ioutil.TempDir("", "TestDataUpdateTracker") |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
err = os.MkdirAll(filepath.Dir(filepath.Join(tmpDir, dataUpdateTrackerFilename)), os.ModePerm) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
defer os.RemoveAll(tmpDir) |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
defer cancel() |
||||
dut.start(ctx, tmpDir) |
||||
|
||||
var tests = []struct { |
||||
in string |
||||
check []string // if not empty, check against these instead.
|
||||
exist bool |
||||
}{ |
||||
{ |
||||
in: "bucket/directory/file.txt", |
||||
check: []string{"bucket", "bucket/", "/bucket", "bucket/directory", "bucket/directory/", "bucket/directory/file.txt", "/bucket/directory/file.txt"}, |
||||
exist: true, |
||||
}, |
||||
{ |
||||
// System bucket
|
||||
in: ".minio.sys/ignoreme/pls", |
||||
exist: false, |
||||
}, |
||||
{ |
||||
// Not a valid bucket
|
||||
in: "./bucket/okfile.txt", |
||||
check: []string{"./bucket/okfile.txt", "/bucket/okfile.txt", "bucket/okfile.txt"}, |
||||
exist: false, |
||||
}, |
||||
{ |
||||
// Not a valid bucket
|
||||
in: "æ/okfile.txt", |
||||
check: []string{"æ/okfile.txt", "æ/okfile.txt", "æ"}, |
||||
exist: false, |
||||
}, |
||||
{ |
||||
in: "/bucket2/okfile2.txt", |
||||
check: []string{"./bucket2/okfile2.txt", "/bucket2/okfile2.txt", "bucket2/okfile2.txt", "bucket2"}, |
||||
exist: true, |
||||
}, |
||||
{ |
||||
in: "/bucket3/prefix/okfile2.txt", |
||||
check: []string{"./bucket3/prefix/okfile2.txt", "/bucket3/prefix/okfile2.txt", "bucket3/prefix/okfile2.txt", "bucket3/prefix", "bucket3"}, |
||||
exist: true, |
||||
}, |
||||
} |
||||
for _, tt := range tests { |
||||
t.Run(tt.in, func(t *testing.T) { |
||||
dut.input <- tt.in |
||||
dut.input <- "" // Sending empty string ensures the previous is added to filter.
|
||||
dut.mu.Lock() |
||||
defer dut.mu.Unlock() |
||||
if len(tt.check) == 0 { |
||||
got := dut.Current.bf.containsDir(tt.in) |
||||
if got != tt.exist { |
||||
// For unlimited tests this could lead to false positives,
|
||||
// but it should be deterministic.
|
||||
t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist) |
||||
} |
||||
return |
||||
} |
||||
for _, check := range tt.check { |
||||
got := dut.Current.bf.containsDir(check) |
||||
if got != tt.exist { |
||||
// For unlimited tests this could lead to false positives,
|
||||
// but it should be deterministic.
|
||||
t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist) |
||||
} |
||||
continue |
||||
} |
||||
}) |
||||
} |
||||
// Cycle to history
|
||||
_, err = dut.cycleFilter(ctx, 1, 2) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
dut.input <- "cycle2/file.txt" |
||||
dut.input <- "" // Sending empty string ensures the previous is added to filter.
|
||||
|
||||
tests = append(tests, struct { |
||||
in string |
||||
check []string |
||||
exist bool |
||||
}{in: "cycle2/file.txt", exist: true}) |
||||
|
||||
// Shut down
|
||||
cancel() |
||||
<-dut.saveExited |
||||
|
||||
if dut.current() != 2 { |
||||
t.Fatal("wrong current idx after save. want 2, got:", dut.current()) |
||||
} |
||||
|
||||
ctx, cancel = context.WithCancel(context.Background()) |
||||
defer cancel() |
||||
|
||||
// Reload...
|
||||
dut = newDataUpdateTracker() |
||||
dut.start(ctx, tmpDir) |
||||
|
||||
if dut.current() != 2 { |
||||
t.Fatal("current idx after load not preserved. want 2, got:", dut.current()) |
||||
} |
||||
bfr2, err := dut.cycleFilter(ctx, 1, 3) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
if !bfr2.Complete { |
||||
t.Fatal("Wanted complete, didn't get it") |
||||
} |
||||
if bfr2.CurrentIdx != 3 { |
||||
t.Fatal("wanted index 3, got", bfr2.CurrentIdx) |
||||
} |
||||
if bfr2.OldestIdx != 1 { |
||||
t.Fatal("wanted oldest index 3, got", bfr2.OldestIdx) |
||||
} |
||||
|
||||
// Rerun test with returned bfr2
|
||||
bf := dut.newBloomFilter() |
||||
_, err = bf.ReadFrom(bytes.NewBuffer(bfr2.Filter)) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
for _, tt := range tests { |
||||
t.Run(tt.in+"-reloaded", func(t *testing.T) { |
||||
if len(tt.check) == 0 { |
||||
got := bf.containsDir(tt.in) |
||||
if got != tt.exist { |
||||
// For unlimited tests this could lead to false positives,
|
||||
// but it should be deterministic.
|
||||
t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist) |
||||
} |
||||
return |
||||
} |
||||
for _, check := range tt.check { |
||||
got := bf.containsDir(check) |
||||
if got != tt.exist { |
||||
// For unlimited tests this could lead to false positives,
|
||||
// but it should be deterministic.
|
||||
t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist) |
||||
} |
||||
continue |
||||
} |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func BenchmarkDataUpdateTracker(b *testing.B) { |
||||
dut := newDataUpdateTracker() |
||||
// Change some defaults.
|
||||
dut.debug = false |
||||
dut.input = make(chan string) |
||||
dut.save = make(chan struct{}) |
||||
|
||||
defer addTestingLogging(b)() |
||||
|
||||
dut.Current.bf = dut.newBloomFilter() |
||||
// We do this unbuffered. This will very significantly reduce throughput, so this is a worst case.
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
defer cancel() |
||||
go dut.startCollector(ctx) |
||||
input := make([]string, 1000) |
||||
rng := rand.New(rand.NewSource(0xabad1dea)) |
||||
tmp := []string{"bucket", "aprefix", "nextprefixlevel", "maybeobjname", "evendeeper", "ok-one-morelevel", "final.object"} |
||||
for i := range input { |
||||
tmp := tmp[:1+rng.Intn(cap(tmp)-1)] |
||||
input[i] = path.Join(tmp...) |
||||
} |
||||
b.SetBytes(1) |
||||
b.ResetTimer() |
||||
b.ReportAllocs() |
||||
for i := 0; i < b.N; i++ { |
||||
dut.input <- input[rng.Intn(len(input))] |
||||
} |
||||
} |
Loading…
Reference in new issue