Add replication capacity metrics support in crawler (#10786)

master
Ritesh H Shukla 4 years ago committed by GitHub
parent 6d70f6a4ac
commit 038bcd9079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      cmd/data-crawler.go
  2. 32
      cmd/data-usage-cache.go
  3. 82
      cmd/data-usage-cache_gen.go
  4. 5
      cmd/data-usage.go
  5. 36
      cmd/data-usage_test.go
  6. 10
      cmd/fs-v1.go
  7. 36
      cmd/metrics.go
  8. 16
      cmd/object-api-datatypes.go
  9. 1
      cmd/storage-rest-server.go
  10. 14
      cmd/xl-storage.go

@ -445,16 +445,16 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
lifeCycle: activeLifeCycle,
heal: thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv),
}
size, err := f.getSize(item)
sizeSummary, err := f.getSize(item)
wait()
if err == errSkipFile {
return nil
}
logger.LogIf(ctx, err)
cache.Size += size
cache.addSizes(sizeSummary)
cache.Objects++
cache.ObjSizes.add(size)
cache.ObjSizes.add(sizeSummary.totalSize)
return nil
})
@ -673,7 +673,7 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder)
}
}
size, err := f.getSize(
sizeSummary, err := f.getSize(
crawlItem{
Path: fileName,
Typ: typ,
@ -692,9 +692,9 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder)
return nil
}
logger.LogIf(ctx, err)
cache.Size += size
cache.addSizes(sizeSummary)
cache.Objects++
cache.ObjSizes.add(size)
cache.ObjSizes.add(sizeSummary.totalSize)
return nil
}
err := readDirFn(path.Join(dirStack...), addDir)
@ -717,7 +717,15 @@ type crawlItem struct {
debug bool
}
type getSizeFn func(item crawlItem) (int64, error)
type sizeSummary struct {
totalSize int64
replicatedSize int64
pendingSize int64
failedSize int64
replicaSize int64
}
type getSizeFn func(item crawlItem) (sizeSummary, error)
// transformMetaDir will transform a directory to prefix/file.ext
func (i *crawlItem) transformMetaDir() {
@ -910,7 +918,7 @@ func (i *crawlItem) objectPath() string {
}
// healReplication will heal a scanned item that has failed replication.
func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, meta actionMeta) {
func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, meta actionMeta, sizeS *sizeSummary) {
if meta.oi.DeleteMarker || !meta.oi.VersionPurgeStatus.Empty() {
//heal delete marker replication failure or versioned delete replication failure
if meta.oi.ReplicationStatus == replication.Pending ||
@ -920,9 +928,17 @@ func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, meta act
return
}
}
if meta.oi.ReplicationStatus == replication.Pending ||
meta.oi.ReplicationStatus == replication.Failed {
switch meta.oi.ReplicationStatus {
case replication.Pending:
sizeS.pendingSize += meta.oi.Size
globalReplicationState.queueReplicaTask(meta.oi)
case replication.Failed:
sizeS.failedSize += meta.oi.Size
globalReplicationState.queueReplicaTask(meta.oi)
case replication.Complete:
sizeS.replicatedSize += meta.oi.Size
case replication.Replica:
sizeS.replicaSize += meta.oi.Size
}
}

@ -47,9 +47,12 @@ type sizeHistogram [dataUsageBucketLen]uint64
type dataUsageEntry struct {
// These fields do no include any children.
Size int64
ReplicatedSize uint64
ReplicationPendingSize uint64
ReplicationFailedSize uint64
ReplicaSize uint64
Objects uint64
ObjSizes sizeHistogram
Children dataUsageHashMap
}
@ -76,10 +79,23 @@ type dataUsageCacheInfo struct {
lifeCycle *lifecycle.Lifecycle `msg:"-"`
}
func (e *dataUsageEntry) addSizes(summary sizeSummary) {
e.Size += summary.totalSize
e.ReplicatedSize += uint64(summary.replicatedSize)
e.ReplicationFailedSize += uint64(summary.failedSize)
e.ReplicationPendingSize += uint64(summary.pendingSize)
e.ReplicaSize += uint64(summary.replicaSize)
}
// merge other data usage entry into this, excluding children.
func (e *dataUsageEntry) merge(other dataUsageEntry) {
e.Objects += other.Objects
e.Size += other.Size
e.ReplicationPendingSize += other.ReplicationPendingSize
e.ReplicationFailedSize += other.ReplicationFailedSize
e.ReplicatedSize += other.ReplicatedSize
e.ReplicaSize += other.ReplicaSize
for i, v := range other.ObjSizes[:] {
e.ObjSizes[i] += v
}
@ -216,6 +232,10 @@ func (d *dataUsageCache) dui(path string, buckets []BucketInfo) DataUsageInfo {
LastUpdate: d.Info.LastUpdate,
ObjectsTotalCount: flat.Objects,
ObjectsTotalSize: uint64(flat.Size),
ReplicatedSize: flat.ReplicatedSize,
ReplicationFailedSize: flat.ReplicationFailedSize,
ReplicationPendingSize: flat.ReplicationPendingSize,
ReplicaSize: flat.ReplicaSize,
BucketsCount: uint64(len(e.Children)),
BucketsUsage: d.bucketsUsageInfo(buckets),
}
@ -345,6 +365,10 @@ func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]Bucke
dst[bucket.Name] = BucketUsageInfo{
Size: uint64(flat.Size),
ObjectsCount: flat.Objects,
ReplicationPendingSize: flat.ReplicationPendingSize,
ReplicatedSize: flat.ReplicatedSize,
ReplicationFailedSize: flat.ReplicationFailedSize,
ReplicaSize: flat.ReplicaSize,
ObjectSizesHistogram: flat.ObjSizes.toMap(),
}
}
@ -362,6 +386,10 @@ func (d *dataUsageCache) bucketUsageInfo(bucket string) BucketUsageInfo {
return BucketUsageInfo{
Size: uint64(flat.Size),
ObjectsCount: flat.Objects,
ReplicationPendingSize: flat.ReplicationPendingSize,
ReplicatedSize: flat.ReplicatedSize,
ReplicationFailedSize: flat.ReplicationFailedSize,
ReplicaSize: flat.ReplicaSize,
ObjectSizesHistogram: flat.ObjSizes.toMap(),
}
}
@ -482,7 +510,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
// dataUsageCacheVer indicates the cache version.
// Bumping the cache version will drop data from previous versions
// and write new data with the new version.
const dataUsageCacheVer = 2
const dataUsageCacheVer = 3
// serialize the contents of the cache.
func (d *dataUsageCache) serialize() []byte {

@ -492,8 +492,8 @@ func (z *dataUsageEntry) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err)
return
}
if zb0001 != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zb0001}
if zb0001 != 8 {
err = msgp.ArrayError{Wanted: 8, Got: zb0001}
return
}
z.Size, err = dc.ReadInt64()
@ -501,6 +501,26 @@ func (z *dataUsageEntry) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Size")
return
}
z.ReplicatedSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ReplicatedSize")
return
}
z.ReplicationPendingSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ReplicationPendingSize")
return
}
z.ReplicationFailedSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ReplicationFailedSize")
return
}
z.ReplicaSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ReplicaSize")
return
}
z.Objects, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "Objects")
@ -533,8 +553,8 @@ func (z *dataUsageEntry) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *dataUsageEntry) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 4
err = en.Append(0x94)
// array header, size 8
err = en.Append(0x98)
if err != nil {
return
}
@ -543,6 +563,26 @@ func (z *dataUsageEntry) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Size")
return
}
err = en.WriteUint64(z.ReplicatedSize)
if err != nil {
err = msgp.WrapError(err, "ReplicatedSize")
return
}
err = en.WriteUint64(z.ReplicationPendingSize)
if err != nil {
err = msgp.WrapError(err, "ReplicationPendingSize")
return
}
err = en.WriteUint64(z.ReplicationFailedSize)
if err != nil {
err = msgp.WrapError(err, "ReplicationFailedSize")
return
}
err = en.WriteUint64(z.ReplicaSize)
if err != nil {
err = msgp.WrapError(err, "ReplicaSize")
return
}
err = en.WriteUint64(z.Objects)
if err != nil {
err = msgp.WrapError(err, "Objects")
@ -571,9 +611,13 @@ func (z *dataUsageEntry) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z *dataUsageEntry) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// array header, size 4
o = append(o, 0x94)
// array header, size 8
o = append(o, 0x98)
o = msgp.AppendInt64(o, z.Size)
o = msgp.AppendUint64(o, z.ReplicatedSize)
o = msgp.AppendUint64(o, z.ReplicationPendingSize)
o = msgp.AppendUint64(o, z.ReplicationFailedSize)
o = msgp.AppendUint64(o, z.ReplicaSize)
o = msgp.AppendUint64(o, z.Objects)
o = msgp.AppendArrayHeader(o, uint32(dataUsageBucketLen))
for za0001 := range z.ObjSizes {
@ -595,8 +639,8 @@ func (z *dataUsageEntry) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err)
return
}
if zb0001 != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zb0001}
if zb0001 != 8 {
err = msgp.ArrayError{Wanted: 8, Got: zb0001}
return
}
z.Size, bts, err = msgp.ReadInt64Bytes(bts)
@ -604,6 +648,26 @@ func (z *dataUsageEntry) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Size")
return
}
z.ReplicatedSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ReplicatedSize")
return
}
z.ReplicationPendingSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ReplicationPendingSize")
return
}
z.ReplicationFailedSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ReplicationFailedSize")
return
}
z.ReplicaSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ReplicaSize")
return
}
z.Objects, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Objects")
@ -637,7 +701,7 @@ func (z *dataUsageEntry) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *dataUsageEntry) Msgsize() (s int) {
s = 1 + msgp.Int64Size + msgp.Uint64Size + msgp.ArrayHeaderSize + (dataUsageBucketLen * (msgp.Uint64Size)) + z.Children.Msgsize()
s = 1 + msgp.Int64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.ArrayHeaderSize + (dataUsageBucketLen * (msgp.Uint64Size)) + z.Children.Msgsize()
return
}

@ -39,8 +39,8 @@ const (
)
// storeDataUsageInBackend will store all objects sent on the gui channel until closed.
func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, gui <-chan DataUsageInfo) {
for dataUsageInfo := range gui {
func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan DataUsageInfo) {
for dataUsageInfo := range dui {
dataUsageJSON, err := json.Marshal(dataUsageInfo)
if err != nil {
logger.LogIf(ctx, err)
@ -52,7 +52,6 @@ func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, gui <-chan
logger.LogIf(ctx, err)
continue
}
_, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageObjName, NewPutObjReader(r, nil, nil), ObjectOptions{})
if !isErrBucketNotFound(err) {
logger.LogIf(ctx, err)

@ -51,15 +51,17 @@ func TestDataUsageUpdate(t *testing.T) {
}
createUsageTestFiles(t, base, bucket, files)
getSize := func(item crawlItem) (i int64, err error) {
getSize := func(item crawlItem) (sizeS sizeSummary, err error) {
if item.Typ&os.ModeDir == 0 {
s, err := os.Stat(item.Path)
var s os.FileInfo
s, err = os.Stat(item.Path)
if err != nil {
return 0, err
return
}
return s.Size(), nil
sizeS.totalSize = s.Size()
return sizeS, nil
}
return 0, nil
return
}
got, err := crawlDataFolder(context.Background(), base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize)
@ -345,15 +347,17 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
}
createUsageTestFiles(t, base, "", files)
getSize := func(item crawlItem) (i int64, err error) {
getSize := func(item crawlItem) (sizeS sizeSummary, err error) {
if item.Typ&os.ModeDir == 0 {
s, err := os.Stat(item.Path)
var s os.FileInfo
s, err = os.Stat(item.Path)
if err != nil {
return 0, err
return
}
return s.Size(), nil
sizeS.totalSize = s.Size()
return
}
return 0, nil
return
}
got, err := crawlDataFolder(context.Background(), base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize)
if err != nil {
@ -642,15 +646,17 @@ func TestDataUsageCacheSerialize(t *testing.T) {
}
createUsageTestFiles(t, base, bucket, files)
getSize := func(item crawlItem) (i int64, err error) {
getSize := func(item crawlItem) (sizeS sizeSummary, err error) {
if item.Typ&os.ModeDir == 0 {
s, err := os.Stat(item.Path)
var s os.FileInfo
s, err = os.Stat(item.Path)
if err != nil {
return 0, err
return
}
return s.Size(), nil
sizeS.totalSize = s.Size()
return
}
return 0, nil
return
}
want, err := crawlDataFolder(context.Background(), base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize)
if err != nil {

@ -317,11 +317,11 @@ func (fs *FSObjects) crawlBucket(ctx context.Context, bucket string, cache dataU
}
// Load bucket info.
cache, err = crawlDataFolder(ctx, fs.fsPath, cache, func(item crawlItem) (int64, error) {
cache, err = crawlDataFolder(ctx, fs.fsPath, cache, func(item crawlItem) (sizeSummary, error) {
bucket, object := item.bucket, item.objectPath()
fsMetaBytes, err := ioutil.ReadFile(pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile))
if err != nil && !osIsNotExist(err) {
return 0, errSkipFile
return sizeSummary{}, errSkipFile
}
fsMeta := newFSMetaV1()
@ -339,7 +339,7 @@ func (fs *FSObjects) crawlBucket(ctx context.Context, bucket string, cache dataU
// Stat the file.
fi, fiErr := os.Stat(item.Path)
if fiErr != nil {
return 0, errSkipFile
return sizeSummary{}, errSkipFile
}
// We cannot heal in FS mode.
item.heal = false
@ -347,10 +347,10 @@ func (fs *FSObjects) crawlBucket(ctx context.Context, bucket string, cache dataU
oi := fsMeta.ToObjectInfo(bucket, object, fi)
sz := item.applyActions(ctx, fs, actionMeta{oi: oi})
if sz >= 0 {
return sz, nil
return sizeSummary{totalSize: sz}, nil
}
return fi.Size(), nil
return sizeSummary{totalSize: fi.Size()}, nil
})
return cache, err

@ -417,6 +417,42 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
float64(usageInfo.ObjectsCount),
bucket,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "replication", "pending_size"),
"Total capacity pending to be replicated",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(usageInfo.ReplicationPendingSize),
bucket,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "replication", "failed_size"),
"Total capacity failed to replicate at least once",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(usageInfo.ReplicationFailedSize),
bucket,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "replication", "successful_size"),
"Total capacity replicated to destination",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(usageInfo.ReplicatedSize),
bucket,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "replication", "received_size"),
"Total capacity replicated to this instance",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(usageInfo.ReplicaSize),
bucket,
)
for k, v := range usageInfo.ObjectSizesHistogram {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(

@ -94,6 +94,10 @@ var ObjectsHistogramIntervals = []objectHistogramInterval{
// - object size histogram per bucket
type BucketUsageInfo struct {
Size uint64 `json:"size"`
ReplicationPendingSize uint64 `json:"objectsPendingReplicationTotalSize"`
ReplicationFailedSize uint64 `json:"objectsFailedReplicationTotalSize"`
ReplicatedSize uint64 `json:"objectsReplicatedTotalSize"`
ReplicaSize uint64 `json:"objectReplicaTotalSize"`
ObjectsCount uint64 `json:"objectsCount"`
ObjectSizesHistogram map[string]uint64 `json:"objectsSizesHistogram"`
}
@ -110,6 +114,18 @@ type DataUsageInfo struct {
// Objects total size across all buckets
ObjectsTotalSize uint64 `json:"objectsTotalSize"`
// Total Size for objects that have not yet been replicated
ReplicationPendingSize uint64 `json:"objectsPendingReplicationTotalSize"`
// Total size for objects that have witness one or more failures and will be retried
ReplicationFailedSize uint64 `json:"objectsFailedReplicationTotalSize"`
// Total size for objects that have been replicated to destination
ReplicatedSize uint64 `json:"objectsReplicatedTotalSize"`
// Total size for objects that are replicas
ReplicaSize uint64 `json:"objectsReplicaTotalSize"`
// Total number of buckets in this cluster
BucketsCount uint64 `json:"bucketsCount"`

@ -173,7 +173,6 @@ func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r
done := keepHTTPResponseAlive(w)
usageInfo, err := s.storage.CrawlAndGetDataUsage(r.Context(), cache)
done(err)
if err != nil {
return

@ -341,17 +341,17 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
healOpts := globalHealConfig
globalHealConfigMu.Unlock()
dataUsageInfo, err := crawlDataFolder(ctx, s.diskPath, cache, func(item crawlItem) (int64, error) {
dataUsageInfo, err := crawlDataFolder(ctx, s.diskPath, cache, func(item crawlItem) (sizeSummary, error) {
// Look for `xl.meta/xl.json' at the leaf.
if !strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFile) &&
!strings.HasSuffix(item.Path, SlashSeparator+xlStorageFormatFileV1) {
// if no xl.meta/xl.json found, skip the file.
return 0, errSkipFile
return sizeSummary{}, errSkipFile
}
buf, err := ioutil.ReadFile(item.Path)
if err != nil {
return 0, errSkipFile
return sizeSummary{}, errSkipFile
}
// Remove filename which is the meta file.
@ -359,12 +359,13 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
fivs, err := getFileInfoVersions(buf, item.bucket, item.objectPath())
if err != nil {
return 0, errSkipFile
return sizeSummary{}, errSkipFile
}
var totalSize int64
var numVersions = len(fivs.Versions)
sizeS := sizeSummary{}
for i, version := range fivs.Versions {
var successorModTime time.Time
if i > 0 {
@ -395,9 +396,10 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
}
totalSize += size
}
item.healReplication(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())})
item.healReplication(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())}, &sizeS)
}
return totalSize, nil
sizeS.totalSize = totalSize
return sizeS, nil
})
if err != nil {

Loading…
Cancel
Save