diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 48c3cd38c..dbc1d7fab 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -88,11 +88,6 @@ type erasureSets struct { disksStorageInfoCache timedValue - // Merge tree walk - pool *MergeWalkPool - poolSplunk *MergeWalkPool - poolVersions *MergeWalkVersionsPool - mrfMU sync.Mutex mrfOperations map[healSource]int } @@ -356,9 +351,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto disksConnectEvent: make(chan diskConnectInfo), distributionAlgo: format.Erasure.DistributionAlgo, deploymentID: uuid.MustParse(format.ID), - pool: NewMergeWalkPool(globalMergeLookupTimeout), - poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout), - poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout), mrfOperations: make(map[healSource]int), } @@ -926,10 +918,6 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI return lentry, lexicallySortedEntryCount, isTruncated } -func (s *erasureSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoCh { - return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1, false) -} - func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh { return s.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1) } @@ -964,42 +952,6 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref return entryChs } -// Starts a walk channel across n number of disks and returns a slice of -// FileInfoCh which can be read from. -func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int, splunk bool) []FileInfoCh { - var entryChs []FileInfoCh - var wg sync.WaitGroup - var mutex sync.Mutex - for _, set := range s.sets { - // Reset for the next erasure set. - for _, disk := range set.getLoadBalancedNDisks(ndisks) { - wg.Add(1) - go func(disk StorageAPI) { - defer wg.Done() - - var entryCh chan FileInfo - var err error - if splunk { - entryCh, err = disk.WalkSplunk(GlobalContext, bucket, prefix, marker, endWalkCh) - } else { - entryCh, err = disk.Walk(GlobalContext, bucket, prefix, marker, recursive, endWalkCh) - } - if err != nil { - // Disk walk returned error, ignore it. - return - } - mutex.Lock() - entryChs = append(entryChs, FileInfoCh{ - Ch: entryCh, - }) - mutex.Unlock() - }(disk) - } - } - wg.Wait() - return entryChs -} - func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) { // In list multipart uploads we are going to treat input prefix as the object, // this means that we are not supporting directory navigation. diff --git a/cmd/tree-walk-pool.go b/cmd/fs-tree-walk-pool.go similarity index 100% rename from cmd/tree-walk-pool.go rename to cmd/fs-tree-walk-pool.go diff --git a/cmd/tree-walk-pool_test.go b/cmd/fs-tree-walk-pool_test.go similarity index 100% rename from cmd/tree-walk-pool_test.go rename to cmd/fs-tree-walk-pool_test.go diff --git a/cmd/merge-walk-pool.go b/cmd/merge-walk-pool.go deleted file mode 100644 index 624409ee7..000000000 --- a/cmd/merge-walk-pool.go +++ /dev/null @@ -1,353 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2019, 2020 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "reflect" - "sync" - "time" -) - -const ( - globalMergeLookupTimeout = time.Minute * 1 // 1 minutes. -) - -// mergeWalkVersions - represents the go routine that does the merge walk versions. -type mergeWalkVersions struct { - added time.Time - entryChs []FileInfoVersionsCh - endWalkCh chan struct{} // To signal when mergeWalk go-routine should end. - endTimerCh chan<- struct{} // To signal when timer go-routine should end. -} - -// mergeWalk - represents the go routine that does the merge walk. -type mergeWalk struct { - added time.Time - entryChs []FileInfoCh - endWalkCh chan struct{} // To signal when mergeWalk go-routine should end. - endTimerCh chan<- struct{} // To signal when timer go-routine should end. -} - -// MergeWalkVersionsPool - pool of mergeWalk go routines. -// A mergeWalk is added to the pool by Set() and removed either by -// doing a Release() or if the concerned timer goes off. -// mergeWalkPool's purpose is to maintain active mergeWalk go-routines in a map so that -// it can be looked up across related list calls. -type MergeWalkVersionsPool struct { - sync.Mutex - pool map[listParams][]mergeWalkVersions - timeOut time.Duration -} - -// NewMergeWalkVersionsPool - initialize new tree walk pool for versions. -func NewMergeWalkVersionsPool(timeout time.Duration) *MergeWalkVersionsPool { - tPool := &MergeWalkVersionsPool{ - pool: make(map[listParams][]mergeWalkVersions), - timeOut: timeout, - } - return tPool -} - -// Release - similar to mergeWalkPool.Release but for versions. -func (t *MergeWalkVersionsPool) Release(params listParams) ([]FileInfoVersionsCh, chan struct{}) { - t.Lock() - defer t.Unlock() - walks, ok := t.pool[params] // Pick the valid walks. - if !ok || len(walks) == 0 { - // Release return nil if params not found. - return nil, nil - } - - // Pop out the first valid walk entry. - walk := walks[0] - walks = walks[1:] - if len(walks) > 0 { - t.pool[params] = walks - } else { - delete(t.pool, params) - } - walk.endTimerCh <- struct{}{} - return walk.entryChs, walk.endWalkCh -} - -// Set - similar to mergeWalkPool.Set but for file versions -func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersionsCh, endWalkCh chan struct{}) { - t.Lock() - defer t.Unlock() - - // If we are above the limit delete at least one entry from the pool. - if len(t.pool) > treeWalkEntryLimit { - age := time.Now() - var oldest listParams - for k, v := range t.pool { - if len(v) == 0 { - delete(t.pool, k) - continue - } - // The first element is the oldest, so we only check that. - e := v[0] - if e.added.Before(age) { - oldest = k - age = e.added - } - } - // Invalidate and delete oldest. - if walks, ok := t.pool[oldest]; ok && len(walks) > 0 { - endCh := walks[0].endTimerCh - endWalkCh := walks[0].endWalkCh - if len(walks) > 1 { - // Move walks forward - copy(walks, walks[1:]) - walks = walks[:len(walks)-1] - t.pool[oldest] = walks - } else { - // Only entry, just delete. - delete(t.pool, oldest) - } - select { - case endCh <- struct{}{}: - close(endWalkCh) - default: - } - } else { - // Shouldn't happen, but just in case. - delete(t.pool, oldest) - } - } - - // Should be a buffered channel so that Release() never blocks. - endTimerCh := make(chan struct{}, 1) - - walkInfo := mergeWalkVersions{ - added: UTCNow(), - entryChs: resultChs, - endWalkCh: endWalkCh, - endTimerCh: endTimerCh, - } - - // Append new walk info. - walks := t.pool[params] - if len(walks) < treeWalkSameEntryLimit { - t.pool[params] = append(walks, walkInfo) - } else { - // We are at limit, invalidate oldest, move list down and add new as last. - select { - case walks[0].endTimerCh <- struct{}{}: - close(walks[0].endWalkCh) - default: - } - copy(walks, walks[1:]) - walks[len(walks)-1] = walkInfo - } - - // Timer go-routine which times out after t.timeOut seconds. - go func(endTimerCh <-chan struct{}, walkInfo mergeWalkVersions) { - select { - // Wait until timeOut - case <-time.After(t.timeOut): - // Timeout has expired. Remove the mergeWalk from mergeWalkPool and - // end the mergeWalk go-routine. - t.Lock() - defer t.Unlock() - walks, ok := t.pool[params] - if ok { - // Trick of filtering without allocating - // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating - nwalks := walks[:0] - // Look for walkInfo, remove it from the walks list. - for _, walk := range walks { - if !reflect.DeepEqual(walk, walkInfo) { - nwalks = append(nwalks, walk) - } - } - if len(nwalks) == 0 { - // No more mergeWalk go-routines associated with listParams - // hence remove map entry. - delete(t.pool, params) - } else { - // There are more mergeWalk go-routines associated with listParams - // hence save the list in the map. - t.pool[params] = nwalks - } - } - // Signal the mergeWalk go-routine to die. - close(endWalkCh) - case <-endTimerCh: - return - } - }(endTimerCh, walkInfo) -} - -// MergeWalkPool - pool of mergeWalk go routines. -// A mergeWalk is added to the pool by Set() and removed either by -// doing a Release() or if the concerned timer goes off. -// mergeWalkPool's purpose is to maintain active mergeWalk go-routines in a map so that -// it can be looked up across related list calls. -type MergeWalkPool struct { - sync.Mutex - pool map[listParams][]mergeWalk - timeOut time.Duration -} - -// NewMergeWalkPool - initialize new tree walk pool. -func NewMergeWalkPool(timeout time.Duration) *MergeWalkPool { - tPool := &MergeWalkPool{ - pool: make(map[listParams][]mergeWalk), - timeOut: timeout, - } - return tPool -} - -// Release - selects a mergeWalk from the pool based on the input -// listParams, removes it from the pool, and returns the MergeWalkResult -// channel. -// Returns nil if listParams does not have an associated mergeWalk. -func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{}) { - t.Lock() - defer t.Unlock() - walks, ok := t.pool[params] // Pick the valid walks. - if !ok || len(walks) == 0 { - // Release return nil if params not found. - return nil, nil - } - - // Pop out the first valid walk entry. - walk := walks[0] - walks[0] = mergeWalk{} // clear references. - walks = walks[1:] - if len(walks) > 0 { - t.pool[params] = walks - } else { - delete(t.pool, params) - } - walk.endTimerCh <- struct{}{} - return walk.entryChs, walk.endWalkCh -} - -// Set - adds a mergeWalk to the mergeWalkPool. -// Also starts a timer go-routine that ends when: -// 1) time.After() expires after t.timeOut seconds. -// The expiration is needed so that the mergeWalk go-routine resources are freed after a timeout -// if the S3 client does only partial listing of objects. -// 2) Release() signals the timer go-routine to end on endTimerCh. -// During listing the timer should not timeout and end the mergeWalk go-routine, hence the -// timer go-routine should be ended. -func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh chan struct{}) { - t.Lock() - defer t.Unlock() - - // If we are above the limit delete at least one entry from the pool. - if len(t.pool) > treeWalkEntryLimit { - age := time.Now() - var oldest listParams - for k, v := range t.pool { - if len(v) == 0 { - delete(t.pool, k) - continue - } - // The first element is the oldest, so we only check that. - e := v[0] - if e.added.Before(age) { - oldest = k - age = e.added - } - } - // Invalidate and delete oldest. - if walks, ok := t.pool[oldest]; ok && len(walks) > 0 { - endCh := walks[0].endTimerCh - endWalkCh := walks[0].endWalkCh - if len(walks) > 1 { - // Move walks forward - copy(walks, walks[1:]) - walks = walks[:len(walks)-1] - t.pool[oldest] = walks - } else { - // Only entry, just delete. - delete(t.pool, oldest) - } - select { - case endCh <- struct{}{}: - close(endWalkCh) - default: - } - } else { - // Shouldn't happen, but just in case. - delete(t.pool, oldest) - } - } - - // Should be a buffered channel so that Release() never blocks. - endTimerCh := make(chan struct{}, 1) - walkInfo := mergeWalk{ - added: UTCNow(), - entryChs: resultChs, - endWalkCh: endWalkCh, - endTimerCh: endTimerCh, - } - - // Append new walk info. - walks := t.pool[params] - if len(walks) < treeWalkSameEntryLimit { - t.pool[params] = append(walks, walkInfo) - } else { - // We are at limit, invalidate oldest, move list down and add new as last. - select { - case walks[0].endTimerCh <- struct{}{}: - close(walks[0].endWalkCh) - default: - } - copy(walks, walks[1:]) - walks[len(walks)-1] = walkInfo - } - - // Timer go-routine which times out after t.timeOut seconds. - go func(endTimerCh <-chan struct{}, walkInfo mergeWalk) { - select { - // Wait until timeOut - case <-time.After(t.timeOut): - // Timeout has expired. Remove the mergeWalk from mergeWalkPool and - // end the mergeWalk go-routine. - t.Lock() - defer t.Unlock() - walks, ok := t.pool[params] - if ok { - // Trick of filtering without allocating - // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating - nwalks := walks[:0] - // Look for walkInfo, remove it from the walks list. - for _, walk := range walks { - if !reflect.DeepEqual(walk, walkInfo) { - nwalks = append(nwalks, walk) - } - } - if len(nwalks) == 0 { - // No more mergeWalk go-routines associated with listParams - // hence remove map entry. - delete(t.pool, params) - } else { - // There are more mergeWalk go-routines associated with listParams - // hence save the list in the map. - t.pool[params] = nwalks - } - } - // Signal the mergeWalk go-routine to die. - close(endWalkCh) - case <-endTimerCh: - return - } - }(endTimerCh, walkInfo) -} diff --git a/cmd/merge-walk-pool_test.go b/cmd/merge-walk-pool_test.go deleted file mode 100644 index 7718b6ab9..000000000 --- a/cmd/merge-walk-pool_test.go +++ /dev/null @@ -1,185 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2019,2020 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "testing" - "time" -) - -// Test if tree walker go-routine is removed from the pool after timeout -// and that is available in the pool before the timeout. -func TestMergeWalkPoolVersionsBasic(t *testing.T) { - // Create a treeWalkPool - tw := NewMergeWalkVersionsPool(1 * time.Second) - - // Create sample params - params := listParams{ - bucket: "test-bucket", - } - - endWalkCh := make(chan struct{}) - // Add a treeWalk to the pool - tw.Set(params, []FileInfoVersionsCh{}, endWalkCh) - - // Wait for treeWalkPool timeout to happen - <-time.After(2 * time.Second) - if c1, _ := tw.Release(params); c1 != nil { - t.Error("treeWalk go-routine must have been freed") - } - - // Add the treeWalk back to the pool - endWalkCh = make(chan struct{}) - tw.Set(params, []FileInfoVersionsCh{}, endWalkCh) - - // Release the treeWalk before timeout - select { - case <-time.After(1 * time.Second): - break - default: - if c1, _ := tw.Release(params); c1 == nil { - t.Error("treeWalk go-routine got freed before timeout") - } - } -} - -// Test if tree walker go-routine is removed from the pool after timeout -// and that is available in the pool before the timeout. -func TestMergeWalkPoolBasic(t *testing.T) { - // Create a treeWalkPool - tw := NewMergeWalkPool(1 * time.Second) - - // Create sample params - params := listParams{ - bucket: "test-bucket", - } - - endWalkCh := make(chan struct{}) - // Add a treeWalk to the pool - tw.Set(params, []FileInfoCh{}, endWalkCh) - - // Wait for treeWalkPool timeout to happen - <-time.After(2 * time.Second) - if c1, _ := tw.Release(params); c1 != nil { - t.Error("treeWalk go-routine must have been freed") - } - - // Add the treeWalk back to the pool - endWalkCh = make(chan struct{}) - tw.Set(params, []FileInfoCh{}, endWalkCh) - - // Release the treeWalk before timeout - select { - case <-time.After(1 * time.Second): - break - default: - if c1, _ := tw.Release(params); c1 == nil { - t.Error("treeWalk go-routine got freed before timeout") - } - } -} - -// Test if multiple merge walkers for the same listParams are managed as expected by the pool. -func TestManyMergeWalksSameParam(t *testing.T) { - // Create a treeWalkPool. - tw := NewMergeWalkPool(5 * time.Second) - - // Create sample params. - params := listParams{ - bucket: "test-bucket", - } - - select { - // This timeout is an upper-bound. This is started - // before the first treeWalk go-routine's timeout period starts. - case <-time.After(5 * time.Second): - break - default: - // Create many treeWalk go-routines for the same params. - for i := 0; i < treeWalkSameEntryLimit; i++ { - endWalkCh := make(chan struct{}) - walkChs := make([]FileInfoCh, 0) - tw.Set(params, walkChs, endWalkCh) - } - - tw.Lock() - if walks, ok := tw.pool[params]; ok { - if len(walks) != treeWalkSameEntryLimit { - t.Error("There aren't as many walks as were Set") - } - } - tw.Unlock() - for i := 0; i < treeWalkSameEntryLimit; i++ { - tw.Lock() - if walks, ok := tw.pool[params]; ok { - // Before ith Release we should have n-i treeWalk go-routines. - if treeWalkSameEntryLimit-i != len(walks) { - t.Error("There aren't as many walks as were Set") - } - } - tw.Unlock() - tw.Release(params) - } - } - -} - -// Test if multiple merge walkers for the same listParams are managed as expected by the pool -// but that treeWalkSameEntryLimit is respected. -func TestManyMergeWalksSameParamPrune(t *testing.T) { - // Create a treeWalkPool. - tw := NewMergeWalkPool(5 * time.Second) - - // Create sample params. - params := listParams{ - bucket: "test-bucket", - } - - select { - // This timeout is an upper-bound. This is started - // before the first treeWalk go-routine's timeout period starts. - case <-time.After(5 * time.Second): - break - default: - // Create many treeWalk go-routines for the same params. - for i := 0; i < treeWalkSameEntryLimit*4; i++ { - endWalkCh := make(chan struct{}) - walkChs := make([]FileInfoCh, 0) - tw.Set(params, walkChs, endWalkCh) - } - - tw.Lock() - if walks, ok := tw.pool[params]; ok { - if len(walks) > treeWalkSameEntryLimit { - t.Error("There aren't as many walks as were Set") - } - } - tw.Unlock() - for i := 0; i < treeWalkSameEntryLimit; i++ { - tw.Lock() - if walks, ok := tw.pool[params]; ok { - // Before ith Release we should have n-i treeWalk go-routines. - if treeWalkSameEntryLimit-i != len(walks) { - t.Error("There aren't as many walks as were Set") - } - } - tw.Unlock() - tw.Release(params) - } - } - -} diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 3d1952509..95681c1b8 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -150,13 +150,6 @@ func (d *naughtyDisk) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Wr return d.disk.WalkDir(ctx, opts, wr) } -func (d *naughtyDisk) WalkSplunk(ctx context.Context, volume, dirPath, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) { - if err := d.calcError(); err != nil { - return nil, err - } - return d.disk.WalkSplunk(ctx, volume, dirPath, marker, endWalkCh) -} - func (d *naughtyDisk) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) { if err := d.calcError(); err != nil { return nil, err @@ -164,13 +157,6 @@ func (d *naughtyDisk) WalkVersions(ctx context.Context, volume, dirPath, marker return d.disk.WalkVersions(ctx, volume, dirPath, marker, recursive, endWalkCh) } -func (d *naughtyDisk) Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) { - if err := d.calcError(); err != nil { - return nil, err - } - return d.disk.Walk(ctx, volume, dirPath, marker, recursive, endWalkCh) -} - func (d *naughtyDisk) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) { if err := d.calcError(); err != nil { return []string{}, err diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 2f7ed2a3a..a78575c4a 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -22,6 +22,24 @@ import ( //go:generate msgp -file=$GOFILE +// DiskInfo is an extended type which returns current +// disk usage per path. +type DiskInfo struct { + Total uint64 + Free uint64 + Used uint64 + FSType string + RootDisk bool + Healing bool + Endpoint string + MountPath string + ID string + Error string // carries the error over the network +} + +// VolsInfo is a collection of volume(bucket) information +type VolsInfo []VolInfo + // VolInfo - represents volume stat information. type VolInfo struct { // Name of the volume. diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index c6c2c6979..a4608092e 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -6,6 +6,334 @@ import ( "github.com/tinylib/msgp/msgp" ) +// DecodeMsg implements msgp.Decodable +func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Total": + z.Total, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Total") + return + } + case "Free": + z.Free, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Free") + return + } + case "Used": + z.Used, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Used") + return + } + case "FSType": + z.FSType, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "FSType") + return + } + case "RootDisk": + z.RootDisk, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "RootDisk") + return + } + case "Healing": + z.Healing, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Healing") + return + } + case "Endpoint": + z.Endpoint, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Endpoint") + return + } + case "MountPath": + z.MountPath, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "MountPath") + return + } + case "ID": + z.ID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + case "Error": + z.Error, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 10 + // write "Total" + err = en.Append(0x8a, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + if err != nil { + return + } + err = en.WriteUint64(z.Total) + if err != nil { + err = msgp.WrapError(err, "Total") + return + } + // write "Free" + err = en.Append(0xa4, 0x46, 0x72, 0x65, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.Free) + if err != nil { + err = msgp.WrapError(err, "Free") + return + } + // write "Used" + err = en.Append(0xa4, 0x55, 0x73, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.Used) + if err != nil { + err = msgp.WrapError(err, "Used") + return + } + // write "FSType" + err = en.Append(0xa6, 0x46, 0x53, 0x54, 0x79, 0x70, 0x65) + if err != nil { + return + } + err = en.WriteString(z.FSType) + if err != nil { + err = msgp.WrapError(err, "FSType") + return + } + // write "RootDisk" + err = en.Append(0xa8, 0x52, 0x6f, 0x6f, 0x74, 0x44, 0x69, 0x73, 0x6b) + if err != nil { + return + } + err = en.WriteBool(z.RootDisk) + if err != nil { + err = msgp.WrapError(err, "RootDisk") + return + } + // write "Healing" + err = en.Append(0xa7, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67) + if err != nil { + return + } + err = en.WriteBool(z.Healing) + if err != nil { + err = msgp.WrapError(err, "Healing") + return + } + // write "Endpoint" + err = en.Append(0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Endpoint) + if err != nil { + err = msgp.WrapError(err, "Endpoint") + return + } + // write "MountPath" + err = en.Append(0xa9, 0x4d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68) + if err != nil { + return + } + err = en.WriteString(z.MountPath) + if err != nil { + err = msgp.WrapError(err, "MountPath") + return + } + // write "ID" + err = en.Append(0xa2, 0x49, 0x44) + if err != nil { + return + } + err = en.WriteString(z.ID) + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + // write "Error" + err = en.Append(0xa5, 0x45, 0x72, 0x72, 0x6f, 0x72) + if err != nil { + return + } + err = en.WriteString(z.Error) + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *DiskInfo) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 10 + // string "Total" + o = append(o, 0x8a, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + o = msgp.AppendUint64(o, z.Total) + // string "Free" + o = append(o, 0xa4, 0x46, 0x72, 0x65, 0x65) + o = msgp.AppendUint64(o, z.Free) + // string "Used" + o = append(o, 0xa4, 0x55, 0x73, 0x65, 0x64) + o = msgp.AppendUint64(o, z.Used) + // string "FSType" + o = append(o, 0xa6, 0x46, 0x53, 0x54, 0x79, 0x70, 0x65) + o = msgp.AppendString(o, z.FSType) + // string "RootDisk" + o = append(o, 0xa8, 0x52, 0x6f, 0x6f, 0x74, 0x44, 0x69, 0x73, 0x6b) + o = msgp.AppendBool(o, z.RootDisk) + // string "Healing" + o = append(o, 0xa7, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67) + o = msgp.AppendBool(o, z.Healing) + // string "Endpoint" + o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) + o = msgp.AppendString(o, z.Endpoint) + // string "MountPath" + o = append(o, 0xa9, 0x4d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68) + o = msgp.AppendString(o, z.MountPath) + // string "ID" + o = append(o, 0xa2, 0x49, 0x44) + o = msgp.AppendString(o, z.ID) + // string "Error" + o = append(o, 0xa5, 0x45, 0x72, 0x72, 0x6f, 0x72) + o = msgp.AppendString(o, z.Error) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Total": + z.Total, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Total") + return + } + case "Free": + z.Free, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Free") + return + } + case "Used": + z.Used, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Used") + return + } + case "FSType": + z.FSType, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "FSType") + return + } + case "RootDisk": + z.RootDisk, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "RootDisk") + return + } + case "Healing": + z.Healing, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Healing") + return + } + case "Endpoint": + z.Endpoint, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Endpoint") + return + } + case "MountPath": + z.MountPath, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "MountPath") + return + } + case "ID": + z.ID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + case "Error": + z.Error, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *DiskInfo) Msgsize() (s int) { + s = 1 + 6 + msgp.Uint64Size + 5 + msgp.Uint64Size + 5 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.FSType) + 9 + msgp.BoolSize + 8 + msgp.BoolSize + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 10 + msgp.StringPrefixSize + len(z.MountPath) + 3 + msgp.StringPrefixSize + len(z.ID) + 6 + msgp.StringPrefixSize + len(z.Error) + return +} + // DecodeMsg implements msgp.Decodable func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 @@ -1080,3 +1408,170 @@ func (z VolInfo) Msgsize() (s int) { s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize return } + +// DecodeMsg implements msgp.Decodable +func (z *VolsInfo) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if cap((*z)) >= int(zb0002) { + (*z) = (*z)[:zb0002] + } else { + (*z) = make(VolsInfo, zb0002) + } + for zb0001 := range *z { + var field []byte + _ = field + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + for zb0003 > 0 { + zb0003-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + switch msgp.UnsafeString(field) { + case "Name": + (*z)[zb0001].Name, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, zb0001, "Name") + return + } + case "Created": + (*z)[zb0001].Created, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, zb0001, "Created") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z VolsInfo) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteArrayHeader(uint32(len(z))) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0004 := range z { + // map header, size 2 + // write "Name" + err = en.Append(0x82, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteString(z[zb0004].Name) + if err != nil { + err = msgp.WrapError(err, zb0004, "Name") + return + } + // write "Created" + err = en.Append(0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteTime(z[zb0004].Created) + if err != nil { + err = msgp.WrapError(err, zb0004, "Created") + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z VolsInfo) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendArrayHeader(o, uint32(len(z))) + for zb0004 := range z { + // map header, size 2 + // string "Name" + o = append(o, 0x82, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z[zb0004].Name) + // string "Created" + o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64) + o = msgp.AppendTime(o, z[zb0004].Created) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *VolsInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if cap((*z)) >= int(zb0002) { + (*z) = (*z)[:zb0002] + } else { + (*z) = make(VolsInfo, zb0002) + } + for zb0001 := range *z { + var field []byte + _ = field + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + for zb0003 > 0 { + zb0003-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + switch msgp.UnsafeString(field) { + case "Name": + (*z)[zb0001].Name, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001, "Name") + return + } + case "Created": + (*z)[zb0001].Created, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001, "Created") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z VolsInfo) Msgsize() (s int) { + s = msgp.ArrayHeaderSize + for zb0004 := range z { + s += 1 + 5 + msgp.StringPrefixSize + len(z[zb0004].Name) + 8 + msgp.TimeSize + } + return +} diff --git a/cmd/storage-datatypes_gen_test.go b/cmd/storage-datatypes_gen_test.go index 16c78d68c..00432e565 100644 --- a/cmd/storage-datatypes_gen_test.go +++ b/cmd/storage-datatypes_gen_test.go @@ -9,6 +9,119 @@ import ( "github.com/tinylib/msgp/msgp" ) +func TestMarshalUnmarshalDiskInfo(t *testing.T) { + v := DiskInfo{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgDiskInfo(b *testing.B) { + v := DiskInfo{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgDiskInfo(b *testing.B) { + v := DiskInfo{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalDiskInfo(b *testing.B) { + v := DiskInfo{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeDiskInfo(t *testing.T) { + v := DiskInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeDiskInfo Msgsize() is inaccurate") + } + + vn := DiskInfo{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeDiskInfo(b *testing.B) { + v := DiskInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeDiskInfo(b *testing.B) { + v := DiskInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalFileInfo(t *testing.T) { v := FileInfo{} bts, err := v.MarshalMsg(nil) @@ -573,3 +686,116 @@ func BenchmarkDecodeVolInfo(b *testing.B) { } } } + +func TestMarshalUnmarshalVolsInfo(t *testing.T) { + v := VolsInfo{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgVolsInfo(b *testing.B) { + v := VolsInfo{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgVolsInfo(b *testing.B) { + v := VolsInfo{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalVolsInfo(b *testing.B) { + v := VolsInfo{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeVolsInfo(t *testing.T) { + v := VolsInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeVolsInfo Msgsize() is inaccurate") + } + + vn := VolsInfo{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeVolsInfo(b *testing.B) { + v := VolsInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeVolsInfo(b *testing.B) { + v := VolsInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/storage-datatypes_test.go b/cmd/storage-datatypes_test.go index e91aa48fa..2dca7d00c 100644 --- a/cmd/storage-datatypes_test.go +++ b/cmd/storage-datatypes_test.go @@ -1,3 +1,19 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cmd import ( @@ -9,6 +25,116 @@ import ( "github.com/tinylib/msgp/msgp" ) +func BenchmarkDecodeDiskInfoMsgp(b *testing.B) { + v := DiskInfo{ + Total: 1000, + Free: 1000, + Used: 1000, + FSType: "xfs", + RootDisk: true, + Healing: true, + Endpoint: "http://localhost:9001/tmp/disk1", + MountPath: "/tmp/disk1", + ID: "uuid", + Error: "", + } + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(1) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.Log("Size:", buf.Len(), "bytes") + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkDecodeDiskInfoGOB(b *testing.B) { + v := DiskInfo{ + Total: 1000, + Free: 1000, + Used: 1000, + FSType: "xfs", + RootDisk: true, + Healing: true, + Endpoint: "http://localhost:9001/tmp/disk1", + MountPath: "/tmp/disk1", + ID: "uuid", + Error: "", + } + + var buf bytes.Buffer + gob.NewEncoder(&buf).Encode(v) + encoded := buf.Bytes() + b.Log("Size:", buf.Len(), "bytes") + b.SetBytes(1) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + dec := gob.NewDecoder(bytes.NewBuffer(encoded)) + err := dec.Decode(&v) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkEncodeDiskInfoMsgp(b *testing.B) { + v := DiskInfo{ + Total: 1000, + Free: 1000, + Used: 1000, + FSType: "xfs", + RootDisk: true, + Healing: true, + Endpoint: "http://localhost:9001/tmp/disk1", + MountPath: "/tmp/disk1", + ID: "uuid", + Error: "", + } + + b.SetBytes(1) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := msgp.Encode(ioutil.Discard, &v) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkEncodeDiskInfoGOB(b *testing.B) { + v := DiskInfo{ + Total: 1000, + Free: 1000, + Used: 1000, + FSType: "xfs", + RootDisk: true, + Healing: true, + Endpoint: "http://localhost:9001/tmp/disk1", + MountPath: "/tmp/disk1", + ID: "uuid", + Error: "", + } + + enc := gob.NewEncoder(ioutil.Discard) + b.SetBytes(1) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := enc.Encode(&v) + if err != nil { + b.Fatal(err) + } + } +} + func BenchmarkDecodeFileInfoMsgp(b *testing.B) { v := FileInfo{Volume: "testbucket", Name: "src/compress/zlib/reader_test.go", VersionID: "", IsLatest: true, Deleted: false, DataDir: "5e0153cc-621a-4267-8cb6-4919140d53b3", XLV1: false, ModTime: UTCNow(), Size: 3430, Mode: 0x0, Metadata: map[string]string{"X-Minio-Internal-Server-Side-Encryption-Iv": "jIJPsrkkVYYMvc7edBrNl+7zcM7+ZwXqMb/YAjBO/ck=", "X-Minio-Internal-Server-Side-Encryption-S3-Kms-Key-Id": "my-minio-key", "X-Minio-Internal-Server-Side-Encryption-S3-Kms-Sealed-Key": "IAAfAP2p7ZLv3UpLwBnsKkF2mtWba0qoY42tymK0szRgGvAxBNcXyHXYooe9dQpeeEJWgKUa/8R61oCy1mFwIg==", "X-Minio-Internal-Server-Side-Encryption-S3-Sealed-Key": "IAAfAPFYRDkHVirJBJxBixNj3PLWt78dFuUTyTLIdLG820J7XqLPBO4gpEEEWw/DoTsJIb+apnaem+rKtQ1h3Q==", "X-Minio-Internal-Server-Side-Encryption-Seal-Algorithm": "DAREv2-HMAC-SHA256", "content-type": "application/octet-stream", "etag": "20000f00e2c3709dc94905c6ce31e1cadbd1c064e14acdcd44cf0ac2db777eeedd88d639fcd64de16851ade8b21a9a1a"}, Parts: []ObjectPartInfo{{ETag: "", Number: 1, Size: 3430, ActualSize: 3398}}, Erasure: ErasureInfo{Algorithm: "reedsolomon", DataBlocks: 2, ParityBlocks: 2, BlockSize: 10485760, Index: 3, Distribution: []int{3, 4, 1, 2}, Checksums: []ChecksumInfo{{PartNumber: 1, Algorithm: 0x3, Hash: []uint8{}}}}} var buf bytes.Buffer diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 65761c4e6..00d527a37 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -53,10 +53,6 @@ type StorageAPI interface { // WalkVersions in sorted order directly on disk. WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) - // Walk in sorted order directly on disk. - Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) - // Walk in sorted order directly on disk. - WalkSplunk(ctx context.Context, volume, dirPath, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) // Metadata operations DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index ee159480f..cbe521e5a 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -207,7 +207,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e return info, err } defer http.DrainBody(respBody) - err = gob.NewDecoder(respBody).Decode(&info) + err = msgp.Decode(respBody, &info) if err != nil { return info, err } @@ -247,8 +247,9 @@ func (client *storageRESTClient) ListVols(ctx context.Context) (vols []VolInfo, return } defer http.DrainBody(respBody) - err = gob.NewDecoder(respBody).Decode(&vols) - return vols, err + vinfos := VolsInfo(vols) + err = msgp.Decode(respBody, &vinfos) + return vinfos, err } // StatVol - get volume info over the network. @@ -260,7 +261,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo return } defer http.DrainBody(respBody) - err = gob.NewDecoder(respBody).Decode(&vol) + err = msgp.Decode(respBody, &vol) return vol, err } @@ -444,40 +445,6 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa return int64(n), err } -func (client *storageRESTClient) WalkSplunk(ctx context.Context, volume, dirPath, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) { - values := make(url.Values) - values.Set(storageRESTVolume, volume) - values.Set(storageRESTDirPath, dirPath) - values.Set(storageRESTMarkerPath, marker) - respBody, err := client.call(ctx, storageRESTMethodWalkSplunk, values, nil, -1) - if err != nil { - return nil, err - } - - ch := make(chan FileInfo) - go func() { - defer close(ch) - defer http.DrainBody(respBody) - - decoder := msgp.NewReader(respBody) - for { - var fi FileInfo - if gerr := fi.DecodeMsg(decoder); gerr != nil { - // Upon error return - return - } - select { - case ch <- fi: - case <-endWalkCh: - return - } - - } - }() - - return ch, nil -} - func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) { values := make(url.Values) values.Set(storageRESTVolume, volume) @@ -515,41 +482,6 @@ func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPa return ch, nil } -func (client *storageRESTClient) Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) { - values := make(url.Values) - values.Set(storageRESTVolume, volume) - values.Set(storageRESTDirPath, dirPath) - values.Set(storageRESTMarkerPath, marker) - values.Set(storageRESTRecursive, strconv.FormatBool(recursive)) - respBody, err := client.call(ctx, storageRESTMethodWalk, values, nil, -1) - if err != nil { - return nil, err - } - - ch := make(chan FileInfo) - go func() { - defer close(ch) - defer http.DrainBody(respBody) - - decoder := msgp.NewReader(respBody) - for { - var fi FileInfo - if gerr := fi.DecodeMsg(decoder); gerr != nil { - // Upon error return - return - } - select { - case ch <- fi: - case <-endWalkCh: - return - } - - } - }() - - return ch, nil -} - // ListDir - lists a directory. func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) { values := make(url.Values) diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 0fba8f97d..289c13f20 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -45,9 +45,7 @@ const ( storageRESTMethodReadFile = "/readfile" storageRESTMethodReadFileStream = "/readfilestream" storageRESTMethodListDir = "/listdir" - storageRESTMethodWalk = "/walk" storageRESTMethodWalkVersions = "/walkversions" - storageRESTMethodWalkSplunk = "/walksplunk" storageRESTMethodDeleteFile = "/deletefile" storageRESTMethodDeleteVersions = "/deleteverions" storageRESTMethodRenameFile = "/renamefile" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index e42f5f4e8..6b57c6052 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -154,7 +154,7 @@ func (s *storageRESTServer) DiskInfoHandler(w http.ResponseWriter, r *http.Reque info.Error = err.Error() } defer w.(http.Flusher).Flush() - gob.NewEncoder(w).Encode(info) + logger.LogIf(r.Context(), msgp.Encode(w, &info)) } func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r *http.Request) { @@ -219,8 +219,8 @@ func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Reque s.writeErrorResponse(w, err) return } - gob.NewEncoder(w).Encode(&infos) - w.(http.Flusher).Flush() + defer w.(http.Flusher).Flush() + logger.LogIf(r.Context(), msgp.Encode(w, VolsInfo(infos))) } // StatVolHandler - stat a volume. @@ -235,8 +235,8 @@ func (s *storageRESTServer) StatVolHandler(w http.ResponseWriter, r *http.Reques s.writeErrorResponse(w, err) return } - gob.NewEncoder(w).Encode(info) - w.(http.Flusher).Flush() + defer w.(http.Flusher).Flush() + logger.LogIf(r.Context(), msgp.Encode(w, &info)) } // DeleteVolumeHandler - delete a volume. @@ -529,30 +529,6 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http w.(http.Flusher).Flush() } -// WalkHandler - remote caller to start walking at a requested directory path. -func (s *storageRESTServer) WalkSplunkHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - return - } - vars := mux.Vars(r) - volume := vars[storageRESTVolume] - dirPath := vars[storageRESTDirPath] - markerPath := vars[storageRESTMarkerPath] - - setEventStreamHeaders(w) - - fch, err := s.storage.WalkSplunk(r.Context(), volume, dirPath, markerPath, r.Context().Done()) - if err != nil { - s.writeErrorResponse(w, err) - return - } - encoder := msgp.NewWriter(w) - for fi := range fch { - logger.LogIf(r.Context(), fi.EncodeMsg(encoder)) - } - logger.LogIf(r.Context(), encoder.Flush()) -} - // WalkVersionsHandler - remote caller to start walking at a requested directory path. func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -582,35 +558,6 @@ func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.R logger.LogIf(r.Context(), encoder.Flush()) } -// WalkHandler - remote caller to start walking at a requested directory path. -func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - return - } - vars := mux.Vars(r) - volume := vars[storageRESTVolume] - dirPath := vars[storageRESTDirPath] - markerPath := vars[storageRESTMarkerPath] - recursive, err := strconv.ParseBool(vars[storageRESTRecursive]) - if err != nil { - s.writeErrorResponse(w, err) - return - } - - setEventStreamHeaders(w) - - fch, err := s.storage.Walk(r.Context(), volume, dirPath, markerPath, recursive, r.Context().Done()) - if err != nil { - s.writeErrorResponse(w, err) - return - } - encoder := msgp.NewWriter(w) - for fi := range fch { - logger.LogIf(r.Context(), fi.EncodeMsg(encoder)) - } - logger.LogIf(r.Context(), encoder.Flush()) -} - // ListDirHandler - list a directory. func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1119,10 +1066,6 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerSets Endpoint Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)). Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount)...) - subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalk).HandlerFunc(httpTraceHdrs(server.WalkHandler)). - Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive)...) - subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkSplunk).HandlerFunc(httpTraceHdrs(server.WalkSplunkHandler)). - Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkVersions).HandlerFunc(httpTraceHdrs(server.WalkVersionsHandler)). Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive)...) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 11975e885..9f36554ec 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -149,22 +149,6 @@ func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath return p.storage.WalkVersions(ctx, volume, dirPath, marker, recursive, endWalkCh) } -func (p *xlStorageDiskIDCheck) Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) { - if err := p.checkDiskStale(); err != nil { - return nil, err - } - - return p.storage.Walk(ctx, volume, dirPath, marker, recursive, endWalkCh) -} - -func (p *xlStorageDiskIDCheck) WalkSplunk(ctx context.Context, volume, dirPath, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) { - if err := p.checkDiskStale(); err != nil { - return nil, err - } - - return p.storage.WalkSplunk(ctx, volume, dirPath, marker, endWalkCh) -} - func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) { if err := p.checkDiskStale(); err != nil { return nil, err diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 63442502e..f9947876b 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -446,21 +446,6 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac return dataUsageInfo, nil } -// DiskInfo is an extended type which returns current -// disk usage per path. -type DiskInfo struct { - Total uint64 - Free uint64 - Used uint64 - FSType string - RootDisk bool - Healing bool - Endpoint string - MountPath string - ID string - Error string // carries the error over the network -} - // DiskInfo provides current information about disk space usage, // total free inodes and underlying filesystem. func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) { @@ -853,79 +838,6 @@ func (s *xlStorage) isLeafDir(volume, leafPath string) bool { return isDirEmpty(pathJoin(volumeDir, leafPath)) } -// WalkSplunk - is a sorted walker which returns file entries in lexically -// sorted order, additionally along with metadata about each of those entries. -// Implemented specifically for Splunk backend structure and List call with -// delimiter as "guidSplunk" -func (s *xlStorage) WalkSplunk(ctx context.Context, volume, dirPath, marker string, endWalkCh <-chan struct{}) (ch chan FileInfo, err error) { - // Verify if volume is valid and it exists. - volumeDir, err := s.getVolDir(volume) - if err != nil { - return nil, err - } - - // Stat a volume entry. - _, err = os.Stat(volumeDir) - if err != nil { - if os.IsNotExist(err) { - return nil, errVolumeNotFound - } else if isSysErrIO(err) { - return nil, errFaultyDisk - } - return nil, err - } - - ch = make(chan FileInfo) - go func() { - defer close(ch) - listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) { - entries, err := s.ListDirSplunk(volume, dirPath, -1) - if err != nil { - return false, nil, false - } - if len(entries) == 0 { - return true, nil, false - } - entries, delayIsLeaf = filterListEntries(volume, dirPath, entries, dirEntry, s.isLeafSplunk) - return false, entries, delayIsLeaf - } - - walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, true, listDir, s.isLeafSplunk, s.isLeafDir, endWalkCh) - for walkResult := range walkResultCh { - var fi FileInfo - if HasSuffix(walkResult.entry, SlashSeparator) { - fi = FileInfo{ - Volume: volume, - Name: walkResult.entry, - Mode: uint32(os.ModeDir), - } - } else { - var err error - var xlMetaBuf []byte - xlMetaBuf, err = ioutil.ReadFile(pathJoin(volumeDir, walkResult.entry, xlStorageFormatFile)) - if err != nil { - continue - } - fi, err = getFileInfo(xlMetaBuf, volume, walkResult.entry, "") - if err != nil { - continue - } - if fi.Deleted { - // Ignore delete markers. - continue - } - } - select { - case ch <- fi: - case <-endWalkCh: - return - } - } - }() - - return ch, nil -} - // WalkVersions - is a sorted walker which returns file entries in lexically sorted order, // additionally along with metadata version info about each of those entries. func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (ch chan FileInfoVersions, err error) { @@ -1011,90 +923,6 @@ func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker st return ch, nil } -// Walk - is a sorted walker which returns file entries in lexically -// sorted order, additionally along with metadata about each of those entries. -func (s *xlStorage) Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (ch chan FileInfo, err error) { - atomic.AddInt32(&s.activeIOCount, 1) - defer func() { - atomic.AddInt32(&s.activeIOCount, -1) - }() - - // Verify if volume is valid and it exists. - volumeDir, err := s.getVolDir(volume) - if err != nil { - return nil, err - } - - // Stat a volume entry. - _, err = os.Stat(volumeDir) - if err != nil { - if os.IsNotExist(err) { - return nil, errVolumeNotFound - } else if isSysErrIO(err) { - return nil, errFaultyDisk - } - return nil, err - } - - // Fast exit track to check if we are listing an object with - // a trailing slash, this will avoid to list the object content. - if HasSuffix(dirPath, SlashSeparator) { - if st, err := os.Stat(pathJoin(volumeDir, dirPath, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() { - return nil, errFileNotFound - } - } - - ch = make(chan FileInfo) - go func() { - defer close(ch) - listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) { - entries, err := s.ListDir(ctx, volume, dirPath, -1) - if err != nil { - return false, nil, false - } - if len(entries) == 0 { - return true, nil, false - } - entries, delayIsLeaf = filterListEntries(volume, dirPath, entries, dirEntry, s.isLeaf) - return false, entries, delayIsLeaf - } - - walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, s.isLeaf, s.isLeafDir, endWalkCh) - for walkResult := range walkResultCh { - var fi FileInfo - if HasSuffix(walkResult.entry, SlashSeparator) { - fi = FileInfo{ - Volume: volume, - Name: walkResult.entry, - Mode: uint32(os.ModeDir), - } - } else { - var err error - var xlMetaBuf []byte - xlMetaBuf, err = ioutil.ReadFile(pathJoin(volumeDir, walkResult.entry, xlStorageFormatFile)) - if err != nil { - continue - } - fi, err = getFileInfo(xlMetaBuf, volume, walkResult.entry, "") - if err != nil { - continue - } - if fi.Deleted { - // Ignore delete markers. - continue - } - } - select { - case ch <- fi: - case <-endWalkCh: - return - } - } - }() - - return ch, nil -} - // ListDir - return all the entries at the given directory path. // If an entry is a directory it will be returned with a trailing SlashSeparator. func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {