crawler: Stream bucket usage cache data (#11068)

Stream bucket caches to storage and through RPC calls.
master
Klaus Post 4 years ago committed by GitHub
parent 82e2be4239
commit 4bca62a0bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      cmd/data-usage-cache.go
  2. 11
      cmd/data-usage_test.go
  3. 2
      cmd/metacache-set.go
  4. 27
      cmd/storage-rest-client.go
  5. 8
      cmd/storage-rest-server.go

@ -489,9 +489,12 @@ func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string)
// save the content of the cache to minioMetaBackgroundOpsBucket with the provided name. // save the content of the cache to minioMetaBackgroundOpsBucket with the provided name.
func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error { func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error {
b := d.serialize() pr, pw := io.Pipe()
size := int64(len(b)) go func() {
r, err := hash.NewReader(bytes.NewReader(b), size, "", "", size, false) pw.CloseWithError(d.serializeTo(pw))
}()
defer pr.Close()
r, err := hash.NewReader(pr, -1, "", "", -1, false)
if err != nil { if err != nil {
return err return err
} }
@ -513,32 +516,33 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
const dataUsageCacheVer = 3 const dataUsageCacheVer = 3
// serialize the contents of the cache. // serialize the contents of the cache.
func (d *dataUsageCache) serialize() []byte { func (d *dataUsageCache) serializeTo(dst io.Writer) error {
// Prepend version and compress. // Add version and compress.
dst := make([]byte, 0, d.Msgsize()+1) _, err := dst.Write([]byte{dataUsageCacheVer})
dst = append(dst, dataUsageCacheVer) if err != nil {
buf := bytes.NewBuffer(dst) return err
enc, err := zstd.NewWriter(buf, }
enc, err := zstd.NewWriter(dst,
zstd.WithEncoderLevel(zstd.SpeedFastest), zstd.WithEncoderLevel(zstd.SpeedFastest),
zstd.WithWindowSize(1<<20), zstd.WithWindowSize(1<<20),
zstd.WithEncoderConcurrency(2)) zstd.WithEncoderConcurrency(2))
if err != nil { if err != nil {
logger.LogIf(GlobalContext, err) return err
return nil
} }
mEnc := msgp.NewWriter(enc) mEnc := msgp.NewWriter(enc)
err = d.EncodeMsg(mEnc) err = d.EncodeMsg(mEnc)
if err != nil { if err != nil {
logger.LogIf(GlobalContext, err) return err
return nil }
err = mEnc.Flush()
if err != nil {
return err
} }
mEnc.Flush()
err = enc.Close() err = enc.Close()
if err != nil { if err != nil {
logger.LogIf(GlobalContext, err) return err
return nil
} }
return buf.Bytes() return nil
} }
// deserialize the supplied byte slice into the cache. // deserialize the supplied byte slice into the cache.

@ -662,14 +662,17 @@ func TestDataUsageCacheSerialize(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
var buf bytes.Buffer
b := want.serialize() err = want.serializeTo(&buf)
if err != nil {
t.Fatal(err)
}
t.Log("serialized size:", buf.Len(), "bytes")
var got dataUsageCache var got dataUsageCache
err = got.deserialize(bytes.NewBuffer(b)) err = got.deserialize(&buf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Log("serialized size:", len(b), "bytes")
if got.Info.LastUpdate.IsZero() { if got.Info.LastUpdate.IsZero() {
t.Error("lastupdate not set") t.Error("lastupdate not set")
} }

@ -828,7 +828,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
ReportNotFound: opts.reportNotFound, ReportNotFound: opts.reportNotFound,
FilterPrefix: opts.filterPrefix}, w) FilterPrefix: opts.filterPrefix}, w)
w.CloseWithError(err) w.CloseWithError(err)
if err != io.EOF { if err != io.EOF && err != nil && err.Error() != errFileNotFound.Error() {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
}() }()

@ -28,6 +28,7 @@ import (
"path" "path"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/http"
@ -170,18 +171,34 @@ func (client *storageRESTClient) Healing() bool {
} }
func (client *storageRESTClient) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { func (client *storageRESTClient) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
b := cache.serialize() pr, pw := io.Pipe()
respBody, err := client.call(ctx, storageRESTMethodCrawlAndGetDataUsage, url.Values{}, bytes.NewBuffer(b), int64(len(b))) go func() {
pw.CloseWithError(cache.serializeTo(pw))
}()
defer pr.Close()
respBody, err := client.call(ctx, storageRESTMethodCrawlAndGetDataUsage, url.Values{}, pr, -1)
defer http.DrainBody(respBody) defer http.DrainBody(respBody)
if err != nil { if err != nil {
return cache, err return cache, err
} }
reader, err := waitForHTTPResponse(respBody)
var wg sync.WaitGroup
var newCache dataUsageCache
var decErr error
pr, pw = io.Pipe()
wg.Add(1)
go func() {
defer wg.Done()
decErr = newCache.deserialize(pr)
pr.CloseWithError(err)
}()
err = waitForHTTPStream(respBody, pw)
pw.CloseWithError(err)
if err != nil { if err != nil {
return cache, err return cache, err
} }
var newCache dataUsageCache wg.Wait()
return newCache, newCache.deserialize(reader) return newCache, decErr
} }
func (client *storageRESTClient) GetDiskID() (string, error) { func (client *storageRESTClient) GetDiskID() (string, error) {

@ -171,14 +171,13 @@ func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r
return return
} }
done := keepHTTPResponseAlive(w) resp := streamHTTPResponse(w)
usageInfo, err := s.storage.CrawlAndGetDataUsage(r.Context(), cache) usageInfo, err := s.storage.CrawlAndGetDataUsage(r.Context(), cache)
done(err)
if err != nil { if err != nil {
resp.CloseWithError(err)
return return
} }
w.Write(usageInfo.serialize()) resp.CloseWithError(usageInfo.serializeTo(resp))
w.(http.Flusher).Flush()
} }
// MakeVolHandler - make a volume. // MakeVolHandler - make a volume.
@ -889,7 +888,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
if err != nil { if err != nil {
return err return err
} }
length := binary.LittleEndian.Uint32(tmp[:]) length := binary.LittleEndian.Uint32(tmp[:])
_, err = io.CopyN(w, respBody, int64(length)) _, err = io.CopyN(w, respBody, int64(length))
if err != nil { if err != nil {

Loading…
Cancel
Save