|
|
@ -69,6 +69,7 @@ type dataUpdateTracker struct { |
|
|
|
save chan struct{} |
|
|
|
save chan struct{} |
|
|
|
debug bool |
|
|
|
debug bool |
|
|
|
saveExited chan struct{} |
|
|
|
saveExited chan struct{} |
|
|
|
|
|
|
|
dirty bool |
|
|
|
|
|
|
|
|
|
|
|
Current dataUpdateFilter |
|
|
|
Current dataUpdateFilter |
|
|
|
History dataUpdateTrackerHistory |
|
|
|
History dataUpdateTrackerHistory |
|
|
@ -87,6 +88,7 @@ func newDataUpdateTracker() *dataUpdateTracker { |
|
|
|
saveExited: make(chan struct{}), |
|
|
|
saveExited: make(chan struct{}), |
|
|
|
} |
|
|
|
} |
|
|
|
d.Current.bf = d.newBloomFilter() |
|
|
|
d.Current.bf = d.newBloomFilter() |
|
|
|
|
|
|
|
d.dirty = true |
|
|
|
return d |
|
|
|
return d |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -235,11 +237,16 @@ func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Durati |
|
|
|
} |
|
|
|
} |
|
|
|
buf.Reset() |
|
|
|
buf.Reset() |
|
|
|
d.mu.Lock() |
|
|
|
d.mu.Lock() |
|
|
|
|
|
|
|
if !d.dirty { |
|
|
|
|
|
|
|
d.mu.Unlock() |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
d.Saved = UTCNow() |
|
|
|
d.Saved = UTCNow() |
|
|
|
err := d.serialize(&buf) |
|
|
|
err := d.serialize(&buf) |
|
|
|
if d.debug { |
|
|
|
if d.debug { |
|
|
|
logger.Info(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v", buf.Len(), d.Current.idx) |
|
|
|
logger.Info(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v", buf.Len(), d.Current.idx) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
d.dirty = false |
|
|
|
d.mu.Unlock() |
|
|
|
d.mu.Unlock() |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
logger.LogIf(ctx, err, "Error serializing usage tracker data") |
|
|
|
logger.LogIf(ctx, err, "Error serializing usage tracker data") |
|
|
@ -459,6 +466,7 @@ func (d *dataUpdateTracker) startCollector(ctx context.Context) { |
|
|
|
hashPath(path.Join(split[:i+1]...)).bytes(tmp[:]) |
|
|
|
hashPath(path.Join(split[:i+1]...)).bytes(tmp[:]) |
|
|
|
d.Current.bf.Add(tmp[:]) |
|
|
|
d.Current.bf.Add(tmp[:]) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
d.dirty = d.dirty || len(split) > 0 |
|
|
|
d.mu.Unlock() |
|
|
|
d.mu.Unlock() |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -527,7 +535,6 @@ func (d *dataUpdateTracker) filterFrom(ctx context.Context, oldest, newest uint6 |
|
|
|
func (d *dataUpdateTracker) cycleFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) { |
|
|
|
func (d *dataUpdateTracker) cycleFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) { |
|
|
|
d.mu.Lock() |
|
|
|
d.mu.Lock() |
|
|
|
defer d.mu.Unlock() |
|
|
|
defer d.mu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
if current == 0 { |
|
|
|
if current == 0 { |
|
|
|
if len(d.History) == 0 { |
|
|
|
if len(d.History) == 0 { |
|
|
|
return d.filterFrom(ctx, d.Current.idx, d.Current.idx), nil |
|
|
|
return d.filterFrom(ctx, d.Current.idx, d.Current.idx), nil |
|
|
@ -538,6 +545,7 @@ func (d *dataUpdateTracker) cycleFilter(ctx context.Context, oldest, current uin |
|
|
|
|
|
|
|
|
|
|
|
// Move current to history if new one requested
|
|
|
|
// Move current to history if new one requested
|
|
|
|
if d.Current.idx != current { |
|
|
|
if d.Current.idx != current { |
|
|
|
|
|
|
|
d.dirty = true |
|
|
|
if d.debug { |
|
|
|
if d.debug { |
|
|
|
logger.Info(color.Green("dataUpdateTracker:")+" cycle bloom filter: %v -> %v", d.Current.idx, current) |
|
|
|
logger.Info(color.Green("dataUpdateTracker:")+" cycle bloom filter: %v -> %v", d.Current.idx, current) |
|
|
|
} |
|
|
|
} |
|
|
|