From c19e6ce7732d738c7f5367dc65b6c201d4a19800 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 26 Dec 2020 22:58:06 -0800 Subject: [PATCH] avoid a crash in crawler when lifecycle is not initialized (#11170) Bonus for static buffers use bytes.NewReader instead of bytes.NewBuffer, to use a more reader friendly implementation --- cmd/admin-handlers_test.go | 2 +- cmd/benchmark-utils_test.go | 10 ++--- cmd/bucket-handlers_test.go | 4 +- cmd/data-crawler.go | 2 +- cmd/data-update-tracker_test.go | 2 +- cmd/dummy-data-generator_test.go | 8 ++-- cmd/erasure-common.go | 60 ++++++++++++++++++++++++++ cmd/erasure.go | 17 ++++---- cmd/handler-utils_test.go | 4 +- cmd/metacache-stream_test.go | 2 +- cmd/notification.go | 12 +++--- cmd/object-api-deleteobject_test.go | 4 +- cmd/object-handlers_test.go | 16 +++---- cmd/storage-datatypes.go | 21 +++++----- cmd/storage-datatypes_gen.go | 35 +++++++++++++--- cmd/xl-storage.go | 65 ++++++++++++++++------------- 16 files changed, 177 insertions(+), 87 deletions(-) diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index f2183ff2e..23f69ef21 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -364,7 +364,7 @@ func TestExtractHealInitParams(t *testing.T) { // Test all combinations! for pIdx, parms := range qParmsArr { for vIdx, vars := range varsArr { - _, err := extractHealInitParams(vars, parms, bytes.NewBuffer([]byte(body))) + _, err := extractHealInitParams(vars, parms, bytes.NewReader([]byte(body))) isErrCase := false if pIdx < 4 || vIdx < 1 { isErrCase = true diff --git a/cmd/benchmark-utils_test.go b/cmd/benchmark-utils_test.go index 92cc4a578..081448468 100644 --- a/cmd/benchmark-utils_test.go +++ b/cmd/benchmark-utils_test.go @@ -55,7 +55,7 @@ func runPutObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) { for i := 0; i < b.N; i++ { // insert the object. objInfo, err := obj.PutObject(context.Background(), bucket, "object"+strconv.Itoa(i), - mustGetPutObjReader(b, bytes.NewBuffer(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{}) + mustGetPutObjReader(b, bytes.NewReader(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{}) if err != nil { b.Fatal(err) } @@ -114,7 +114,7 @@ func runPutObjectPartBenchmark(b *testing.B, obj ObjectLayer, partSize int) { md5hex := getMD5Hash([]byte(textPartData)) var partInfo PartInfo partInfo, err = obj.PutObjectPart(context.Background(), bucket, object, uploadID, j, - mustGetPutObjReader(b, bytes.NewBuffer(textPartData), int64(len(textPartData)), md5hex, sha256hex), ObjectOptions{}) + mustGetPutObjReader(b, bytes.NewReader(textPartData), int64(len(textPartData)), md5hex, sha256hex), ObjectOptions{}) if err != nil { b.Fatal(err) } @@ -200,7 +200,7 @@ func runGetObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) { // insert the object. var objInfo ObjectInfo objInfo, err = obj.PutObject(context.Background(), bucket, "object"+strconv.Itoa(i), - mustGetPutObjReader(b, bytes.NewBuffer(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{}) + mustGetPutObjReader(b, bytes.NewReader(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{}) if err != nil { b.Fatal(err) } @@ -301,7 +301,7 @@ func runPutObjectBenchmarkParallel(b *testing.B, obj ObjectLayer, objSize int) { for pb.Next() { // insert the object. objInfo, err := obj.PutObject(context.Background(), bucket, "object"+strconv.Itoa(i), - mustGetPutObjReader(b, bytes.NewBuffer(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{}) + mustGetPutObjReader(b, bytes.NewReader(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{}) if err != nil { b.Fatal(err) } @@ -340,7 +340,7 @@ func runGetObjectBenchmarkParallel(b *testing.B, obj ObjectLayer, objSize int) { // insert the object. var objInfo ObjectInfo objInfo, err = obj.PutObject(context.Background(), bucket, "object"+strconv.Itoa(i), - mustGetPutObjReader(b, bytes.NewBuffer(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{}) + mustGetPutObjReader(b, bytes.NewReader(textData), int64(len(textData)), md5hex, sha256hex), ObjectOptions{}) if err != nil { b.Fatal(err) } diff --git a/cmd/bucket-handlers_test.go b/cmd/bucket-handlers_test.go index 70c5bdd44..83b1458cb 100644 --- a/cmd/bucket-handlers_test.go +++ b/cmd/bucket-handlers_test.go @@ -35,7 +35,7 @@ func TestRemoveBucketHandler(t *testing.T) { func testRemoveBucketHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler, credentials auth.Credentials, t *testing.T) { - _, err := obj.PutObject(GlobalContext, bucketName, "test-object", mustGetPutObjReader(t, bytes.NewBuffer([]byte{}), int64(0), "", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"), ObjectOptions{}) + _, err := obj.PutObject(GlobalContext, bucketName, "test-object", mustGetPutObjReader(t, bytes.NewReader([]byte{}), int64(0), "", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"), ObjectOptions{}) // if object upload fails stop the test. if err != nil { t.Fatalf("Error uploading object: %v", err) @@ -669,7 +669,7 @@ func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketNa for i := 0; i < 10; i++ { objectName := "test-object-" + strconv.Itoa(i) // uploading the object. - _, err = obj.PutObject(GlobalContext, bucketName, objectName, mustGetPutObjReader(t, bytes.NewBuffer(contentBytes), int64(len(contentBytes)), "", sha256sum), ObjectOptions{}) + _, err = obj.PutObject(GlobalContext, bucketName, objectName, mustGetPutObjReader(t, bytes.NewReader(contentBytes), int64(len(contentBytes)), "", sha256sum), ObjectOptions{}) // if object upload fails stop the test. if err != nil { t.Fatalf("Put Object %d: Error uploading object: %v", i, err) diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index d199046d4..1ff245fac 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -219,7 +219,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, } if len(cache.Info.BloomFilter) > 0 { s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}} - _, err := s.withFilter.ReadFrom(bytes.NewBuffer(cache.Info.BloomFilter)) + _, err := s.withFilter.ReadFrom(bytes.NewReader(cache.Info.BloomFilter)) if err != nil { logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter") s.withFilter = nil diff --git a/cmd/data-update-tracker_test.go b/cmd/data-update-tracker_test.go index 4eaf38590..82bb74fd4 100644 --- a/cmd/data-update-tracker_test.go +++ b/cmd/data-update-tracker_test.go @@ -225,7 +225,7 @@ func TestDataUpdateTracker(t *testing.T) { // Rerun test with returned bfr2 bf := dut.newBloomFilter() - _, err = bf.ReadFrom(bytes.NewBuffer(bfr2.Filter)) + _, err = bf.ReadFrom(bytes.NewReader(bfr2.Filter)) if err != nil { t.Fatal(err) } diff --git a/cmd/dummy-data-generator_test.go b/cmd/dummy-data-generator_test.go index 052d852f5..4fc02073d 100644 --- a/cmd/dummy-data-generator_test.go +++ b/cmd/dummy-data-generator_test.go @@ -163,8 +163,8 @@ func cmpReaders(r1, r2 io.Reader) (bool, string) { func TestCmpReaders(t *testing.T) { { - r1 := bytes.NewBuffer([]byte("abc")) - r2 := bytes.NewBuffer([]byte("abc")) + r1 := bytes.NewReader([]byte("abc")) + r2 := bytes.NewReader([]byte("abc")) ok, msg := cmpReaders(r1, r2) if !(ok && msg == "") { t.Fatalf("unexpected") @@ -172,8 +172,8 @@ func TestCmpReaders(t *testing.T) { } { - r1 := bytes.NewBuffer([]byte("abc")) - r2 := bytes.NewBuffer([]byte("abcd")) + r1 := bytes.NewReader([]byte("abc")) + r2 := bytes.NewReader([]byte("abcd")) ok, _ := cmpReaders(r1, r2) if ok { t.Fatalf("unexpected") diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 3591d9d7e..02252f508 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -19,6 +19,7 @@ package cmd import ( "context" "path" + "sort" "sync" "github.com/minio/minio/pkg/sync/errgroup" @@ -37,6 +38,65 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { return newDisks } +type sortSlices struct { + disks []StorageAPI + infos []DiskInfo +} + +type sortByOther sortSlices + +func (sbo sortByOther) Len() int { + return len(sbo.disks) +} + +func (sbo sortByOther) Swap(i, j int) { + sbo.disks[i], sbo.disks[j] = sbo.disks[j], sbo.disks[i] + sbo.infos[i], sbo.infos[j] = sbo.infos[j], sbo.infos[i] +} + +func (sbo sortByOther) Less(i, j int) bool { + return sbo.infos[i].UsedInodes < sbo.infos[j].UsedInodes +} + +func (er erasureObjects) getOnlineDisksSortedByUsedInodes() (newDisks []StorageAPI) { + disks := er.getDisks() + var wg sync.WaitGroup + var mu sync.Mutex + var infos []DiskInfo + for _, i := range hashOrder(UTCNow().String(), len(disks)) { + i := i + wg.Add(1) + go func() { + defer wg.Done() + if disks[i-1] == nil { + return + } + di, err := disks[i-1].DiskInfo(context.Background()) + if err != nil || di.Healing { + + // - Do not consume disks which are not reachable + // unformatted or simply not accessible for some reason. + // + // - Do not consume disks which are being healed + // + // - Future: skip busy disks + return + } + + mu.Lock() + newDisks = append(newDisks, disks[i-1]) + infos = append(infos, di) + mu.Unlock() + }() + } + wg.Wait() + + slices := sortSlices{newDisks, infos} + sort.Sort(sortByOther(slices)) + + return slices.disks +} + func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { disks := er.getDisks() var wg sync.WaitGroup diff --git a/cmd/erasure.go b/cmd/erasure.go index e2ba9bbd8..b5d2d7357 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -245,8 +245,8 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc return nil } - // Collect disks we can use. - disks := er.getOnlineDisks() + // Collect disks we can use, sorted by least inode usage. + disks := er.getOnlineDisksSortedByUsedInodes() if len(disks) == 0 { logger.Info(color.Green("data-crawl:") + " all disks are offline or being healed, skipping crawl") return nil @@ -312,7 +312,6 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc defer saverWg.Done() var lastSave time.Time - saveLoop: for { select { case <-ctx.Done(): @@ -327,17 +326,17 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc lastSave = cache.Info.LastUpdate case v, ok := <-bucketResults: if !ok { - break saveLoop + // Save final state... + cache.Info.NextCycle++ + cache.Info.LastUpdate = time.Now() + logger.LogIf(ctx, cache.save(ctx, er, dataUsageCacheName)) + updates <- cache + return } cache.replace(v.Name, v.Parent, v.Entry) cache.Info.LastUpdate = time.Now() } } - // Save final state... - cache.Info.NextCycle++ - cache.Info.LastUpdate = time.Now() - logger.LogIf(ctx, cache.save(ctx, er, dataUsageCacheName)) - updates <- cache }() // Start one crawler per disk diff --git a/cmd/handler-utils_test.go b/cmd/handler-utils_test.go index a519cbad1..505e03000 100644 --- a/cmd/handler-utils_test.go +++ b/cmd/handler-utils_test.go @@ -43,7 +43,7 @@ func TestIsValidLocationContraint(t *testing.T) { // Corrupted XML malformedReq := &http.Request{ - Body: ioutil.NopCloser(bytes.NewBuffer([]byte("<>"))), + Body: ioutil.NopCloser(bytes.NewReader([]byte("<>"))), ContentLength: int64(len("<>")), } @@ -58,7 +58,7 @@ func TestIsValidLocationContraint(t *testing.T) { createBucketConfig := createBucketLocationConfiguration{} createBucketConfig.Location = location createBucketConfigBytes, _ := xml.Marshal(createBucketConfig) - createBucketConfigBuffer := bytes.NewBuffer(createBucketConfigBytes) + createBucketConfigBuffer := bytes.NewReader(createBucketConfigBytes) req.Body = ioutil.NopCloser(createBucketConfigBuffer) req.ContentLength = int64(createBucketConfigBuffer.Len()) return req diff --git a/cmd/metacache-stream_test.go b/cmd/metacache-stream_test.go index 6525b4e04..063e985fa 100644 --- a/cmd/metacache-stream_test.go +++ b/cmd/metacache-stream_test.go @@ -33,7 +33,7 @@ func loadMetacacheSample(t testing.TB) *metacacheReader { if err != nil { t.Fatal(err) } - r, err := newMetacacheReader(bytes.NewBuffer(b)) + r, err := newMetacacheReader(bytes.NewReader(b)) if err != nil { t.Fatal(err) } diff --git a/cmd/notification.go b/cmd/notification.go index 6739cd3f4..cc3af098e 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -342,7 +342,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io logger.LogIf(ctx, zerr) continue } - if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { + if _, err = io.Copy(zwriter, bytes.NewReader(data)); err != nil { reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) ctx := logger.SetReqInfo(ctx, reqInfo) logger.LogIf(ctx, err) @@ -387,7 +387,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io return profilingDataFound } - if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { + if _, err = io.Copy(zwriter, bytes.NewReader(data)); err != nil { return profilingDataFound } } @@ -443,7 +443,7 @@ func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint6 if err == nil && bfr.Complete { nbf := intDataUpdateTracker.newBloomFilter() bf = &nbf - _, err = bf.ReadFrom(bytes.NewBuffer(bfr.Filter)) + _, err = bf.ReadFrom(bytes.NewReader(bfr.Filter)) logger.LogIf(ctx, err) } @@ -471,7 +471,7 @@ func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint6 } var tmp bloom.BloomFilter - _, err = tmp.ReadFrom(bytes.NewBuffer(serverBF.Filter)) + _, err = tmp.ReadFrom(bytes.NewReader(serverBF.Filter)) if err != nil { logger.LogIf(ctx, err) bf = nil @@ -508,7 +508,7 @@ func (sys *NotificationSys) collectBloomFilter(ctx context.Context, from uint64) if err == nil && bfr.Complete { nbf := intDataUpdateTracker.newBloomFilter() bf = &nbf - _, err = bf.ReadFrom(bytes.NewBuffer(bfr.Filter)) + _, err = bf.ReadFrom(bytes.NewReader(bfr.Filter)) logger.LogIf(ctx, err) } if !bfr.Complete { @@ -540,7 +540,7 @@ func (sys *NotificationSys) collectBloomFilter(ctx context.Context, from uint64) } var tmp bloom.BloomFilter - _, err = tmp.ReadFrom(bytes.NewBuffer(serverBF.Filter)) + _, err = tmp.ReadFrom(bytes.NewReader(serverBF.Filter)) if err != nil { logger.LogIf(ctx, err) bf = nil diff --git a/cmd/object-api-deleteobject_test.go b/cmd/object-api-deleteobject_test.go index fbf537b80..deefdbbf9 100644 --- a/cmd/object-api-deleteobject_test.go +++ b/cmd/object-api-deleteobject_test.go @@ -17,10 +17,10 @@ package cmd import ( - "bytes" "context" "crypto/md5" "encoding/hex" + "strings" "testing" ) @@ -91,7 +91,7 @@ func testDeleteObject(obj ObjectLayer, instanceType string, t TestErrHandler) { for _, object := range testCase.objectToUploads { md5Bytes := md5.Sum([]byte(object.content)) - _, err = obj.PutObject(context.Background(), testCase.bucketName, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), + _, err = obj.PutObject(context.Background(), testCase.bucketName, object.name, mustGetPutObjReader(t, strings.NewReader(object.content), int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{}) if err != nil { t.Fatalf("%s : %s", instanceType, err.Error()) diff --git a/cmd/object-handlers_test.go b/cmd/object-handlers_test.go index 0483906e3..4c111dbb2 100644 --- a/cmd/object-handlers_test.go +++ b/cmd/object-handlers_test.go @@ -85,7 +85,7 @@ func testAPIHeadObjectHandler(obj ObjectLayer, instanceType, bucketName string, // iterate through the above set of inputs and upload the object. for i, input := range putObjectInputs { // uploading the object. - _, err := obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData}) + _, err := obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData}) // if object upload fails stop the test. if err != nil { t.Fatalf("Put Object case %d: Error uploading object: %v", i+1, err) @@ -357,7 +357,7 @@ func testAPIGetObjectHandler(obj ObjectLayer, instanceType, bucketName string, a // iterate through the above set of inputs and upload the object. for i, input := range putObjectInputs { // uploading the object. - _, err := obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData}) + _, err := obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData}) // if object upload fails stop the test. if err != nil { t.Fatalf("Put Object case %d: Error uploading object: %v", i+1, err) @@ -1571,7 +1571,7 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam for i, input := range putObjectInputs { // uploading the object. _, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, - mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData}) + mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData}) // if object upload fails stop the test. if err != nil { t.Fatalf("Put Object case %d: Error uploading object: %v", i+1, err) @@ -1681,7 +1681,7 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri // iterate through the above set of inputs and upload the object. for i, input := range putObjectInputs { // uploading the object. - _, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData}) + _, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData}) // if object upload fails stop the test. if err != nil { t.Fatalf("Put Object case %d: Error uploading object: %v", i+1, err) @@ -2018,7 +2018,7 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string, for i, input := range putObjectInputs { // uploading the object. var objInfo ObjectInfo - objInfo, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.md5sum, ""), ObjectOptions{UserDefined: input.metaData}) + objInfo, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.md5sum, ""), ObjectOptions{UserDefined: input.metaData}) // if object upload fails stop the test. if err != nil { t.Fatalf("Put Object case %d: Error uploading object: %v", i+1, err) @@ -2669,7 +2669,7 @@ func testAPICompleteMultipartHandler(obj ObjectLayer, instanceType, bucketName s // Iterating over creatPartCases to generate multipart chunks. for _, part := range parts { _, err = obj.PutObjectPart(context.Background(), part.bucketName, part.objName, part.uploadID, part.PartID, - mustGetPutObjReader(t, bytes.NewBufferString(part.inputReaderData), part.intputDataSize, part.inputMd5, ""), opts) + mustGetPutObjReader(t, strings.NewReader(part.inputReaderData), part.intputDataSize, part.inputMd5, ""), opts) if err != nil { t.Fatalf("%s : %s", instanceType, err) } @@ -3040,7 +3040,7 @@ func testAPIAbortMultipartHandler(obj ObjectLayer, instanceType, bucketName stri // Iterating over createPartCases to generate multipart chunks. for _, part := range parts { _, err = obj.PutObjectPart(context.Background(), part.bucketName, part.objName, part.uploadID, part.PartID, - mustGetPutObjReader(t, bytes.NewBufferString(part.inputReaderData), part.intputDataSize, part.inputMd5, ""), opts) + mustGetPutObjReader(t, strings.NewReader(part.inputReaderData), part.intputDataSize, part.inputMd5, ""), opts) if err != nil { t.Fatalf("%s : %s", instanceType, err) } @@ -3177,7 +3177,7 @@ func testAPIDeleteObjectHandler(obj ObjectLayer, instanceType, bucketName string // iterate through the above set of inputs and upload the object. for i, input := range putObjectInputs { // uploading the object. - _, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData}) + _, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewReader(input.textData), input.contentLength, input.metaData[""], ""), ObjectOptions{UserDefined: input.metaData}) // if object upload fails stop the test. if err != nil { t.Fatalf("Put Object case %d: Error uploading object: %v", i+1, err) diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 3890264cf..2e75d87b4 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -25,16 +25,17 @@ import ( // DiskInfo is an extended type which returns current // disk usage per path. type DiskInfo struct { - Total uint64 - Free uint64 - Used uint64 - FSType string - RootDisk bool - Healing bool - Endpoint string - MountPath string - ID string - Error string // carries the error over the network + Total uint64 + Free uint64 + Used uint64 + UsedInodes uint64 + FSType string + RootDisk bool + Healing bool + Endpoint string + MountPath string + ID string + Error string // carries the error over the network } // VolsInfo is a collection of volume(bucket) information diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 1f076dbbc..48404402b 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -42,6 +42,12 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Used") return } + case "UsedInodes": + z.UsedInodes, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "UsedInodes") + return + } case "FSType": z.FSType, err = dc.ReadString() if err != nil { @@ -97,9 +103,9 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 10 + // map header, size 11 // write "Total" - err = en.Append(0x8a, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + err = en.Append(0x8b, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) if err != nil { return } @@ -128,6 +134,16 @@ func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Used") return } + // write "UsedInodes" + err = en.Append(0xaa, 0x55, 0x73, 0x65, 0x64, 0x49, 0x6e, 0x6f, 0x64, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.UsedInodes) + if err != nil { + err = msgp.WrapError(err, "UsedInodes") + return + } // write "FSType" err = en.Append(0xa6, 0x46, 0x53, 0x54, 0x79, 0x70, 0x65) if err != nil { @@ -204,9 +220,9 @@ func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *DiskInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 10 + // map header, size 11 // string "Total" - o = append(o, 0x8a, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + o = append(o, 0x8b, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) o = msgp.AppendUint64(o, z.Total) // string "Free" o = append(o, 0xa4, 0x46, 0x72, 0x65, 0x65) @@ -214,6 +230,9 @@ func (z *DiskInfo) MarshalMsg(b []byte) (o []byte, err error) { // string "Used" o = append(o, 0xa4, 0x55, 0x73, 0x65, 0x64) o = msgp.AppendUint64(o, z.Used) + // string "UsedInodes" + o = append(o, 0xaa, 0x55, 0x73, 0x65, 0x64, 0x49, 0x6e, 0x6f, 0x64, 0x65, 0x73) + o = msgp.AppendUint64(o, z.UsedInodes) // string "FSType" o = append(o, 0xa6, 0x46, 0x53, 0x54, 0x79, 0x70, 0x65) o = msgp.AppendString(o, z.FSType) @@ -274,6 +293,12 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Used") return } + case "UsedInodes": + z.UsedInodes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "UsedInodes") + return + } case "FSType": z.FSType, bts, err = msgp.ReadStringBytes(bts) if err != nil { @@ -330,7 +355,7 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *DiskInfo) Msgsize() (s int) { - s = 1 + 6 + msgp.Uint64Size + 5 + msgp.Uint64Size + 5 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.FSType) + 9 + msgp.BoolSize + 8 + msgp.BoolSize + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 10 + msgp.StringPrefixSize + len(z.MountPath) + 3 + msgp.StringPrefixSize + len(z.ID) + 6 + msgp.StringPrefixSize + len(z.Error) + s = 1 + 6 + msgp.Uint64Size + 5 + msgp.Uint64Size + 5 + msgp.Uint64Size + 11 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.FSType) + 9 + msgp.BoolSize + 8 + msgp.BoolSize + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 10 + msgp.StringPrefixSize + len(z.MountPath) + 3 + msgp.StringPrefixSize + len(z.ID) + 6 + msgp.StringPrefixSize + len(z.Error) return } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 0610fb573..b37b77ae8 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -332,20 +332,22 @@ func (s *xlStorage) Healing() bool { } func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { + var lc *lifecycle.Lifecycle + var err error + // Check if the current bucket has a configured lifecycle policy - lc, err := globalLifecycleSys.Get(cache.Info.Name) - if err == nil && lc.HasActiveRules("", true) { - cache.Info.lifeCycle = lc - if intDataUpdateTracker.debug { - logger.Info(color.Green("crawlDisk:") + " lifecycle: Active rules found") + if globalLifecycleSys != nil { + lc, err = globalLifecycleSys.Get(cache.Info.Name) + if err == nil && lc.HasActiveRules("", true) { + cache.Info.lifeCycle = lc + if intDataUpdateTracker.debug { + logger.Info(color.Green("crawlDisk:") + " lifecycle: Active rules found") + } } } - // Get object api + // return initialized object layer objAPI := newObjectLayerFn() - if objAPI == nil { - return cache, errServerNotInitialized - } globalHealConfigMu.Lock() healOpts := globalHealConfig @@ -388,31 +390,33 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac successorModTime = fivs.Versions[i-1].ModTime } oi := version.ToObjectInfo(item.bucket, item.objectPath()) - size := item.applyActions(ctx, objAPI, actionMeta{ - numVersions: numVersions, - successorModTime: successorModTime, - oi: oi, - }) - if !version.Deleted { - // Bitrot check local data - if size > 0 && item.heal && healOpts.Bitrot { - // HealObject verifies bitrot requirement internally - res, err := objAPI.HealObject(ctx, item.bucket, item.objectPath(), oi.VersionID, madmin.HealOpts{ - Remove: healDeleteDangling, - ScanMode: madmin.HealDeepScan, - }) - if err != nil { - if !errors.Is(err, NotImplemented{}) { - logger.LogIf(ctx, err) + if objAPI != nil { + size := item.applyActions(ctx, objAPI, actionMeta{ + numVersions: numVersions, + successorModTime: successorModTime, + oi: oi, + }) + if !version.Deleted { + // Bitrot check local data + if size > 0 && item.heal && healOpts.Bitrot { + // HealObject verifies bitrot requirement internally + res, err := objAPI.HealObject(ctx, item.bucket, item.objectPath(), oi.VersionID, madmin.HealOpts{ + Remove: healDeleteDangling, + ScanMode: madmin.HealDeepScan, + }) + if err != nil { + if !errors.Is(err, NotImplemented{}) { + logger.LogIf(ctx, err) + } + size = 0 + } else { + size = res.ObjectSize } - size = 0 - } else { - size = res.ObjectSize } + totalSize += size } - totalSize += size + item.healReplication(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())}, &sizeS) } - item.healReplication(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())}, &sizeS) } sizeS.totalSize = totalSize return sizeS, nil @@ -449,6 +453,7 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) { dcinfo.Total = di.Total dcinfo.Free = di.Free dcinfo.Used = di.Used + dcinfo.UsedInodes = di.Files - di.Ffree dcinfo.FSType = di.FSType diskID, err := s.GetDiskID()