diff --git a/cmd/merge-walk-pool.go b/cmd/merge-walk-pool.go new file mode 100644 index 000000000..b09fd6740 --- /dev/null +++ b/cmd/merge-walk-pool.go @@ -0,0 +1,143 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "reflect" + "sync" + "time" +) + +const ( + globalMergeLookupTimeout = time.Minute * 1 // 1 minutes. +) + +// mergeWalk - represents the go routine that does the merge walk. +type mergeWalk struct { + entryChs []FileInfoCh + endWalkCh chan struct{} // To signal when mergeWalk go-routine should end. + endTimerCh chan<- struct{} // To signal when timer go-routine should end. +} + +// MergeWalkPool - pool of mergeWalk go routines. +// A mergeWalk is added to the pool by Set() and removed either by +// doing a Release() or if the concerned timer goes off. +// mergeWalkPool's purpose is to maintain active mergeWalk go-routines in a map so that +// it can be looked up across related list calls. +type MergeWalkPool struct { + pool map[listParams][]mergeWalk + timeOut time.Duration + lock *sync.Mutex +} + +// NewMergeWalkPool - initialize new tree walk pool. +func NewMergeWalkPool(timeout time.Duration) *MergeWalkPool { + tPool := &MergeWalkPool{ + pool: make(map[listParams][]mergeWalk), + timeOut: timeout, + lock: &sync.Mutex{}, + } + return tPool +} + +// Release - selects a mergeWalk from the pool based on the input +// listParams, removes it from the pool, and returns the MergeWalkResult +// channel. +// Returns nil if listParams does not have an asccociated mergeWalk. +func (t MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{}) { + t.lock.Lock() + defer t.lock.Unlock() + walks, ok := t.pool[params] // Pick the valid walks. + if ok { + if len(walks) > 0 { + // Pop out the first valid walk entry. + walk := walks[0] + walks = walks[1:] + if len(walks) > 0 { + t.pool[params] = walks + } else { + delete(t.pool, params) + } + walk.endTimerCh <- struct{}{} + return walk.entryChs, walk.endWalkCh + } + } + // Release return nil if params not found. + return nil, nil +} + +// Set - adds a mergeWalk to the mergeWalkPool. +// Also starts a timer go-routine that ends when: +// 1) time.After() expires after t.timeOut seconds. +// The expiration is needed so that the mergeWalk go-routine resources are freed after a timeout +// if the S3 client does only partial listing of objects. +// 2) Relase() signals the timer go-routine to end on endTimerCh. +// During listing the timer should not timeout and end the mergeWalk go-routine, hence the +// timer go-routine should be ended. +func (t MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh chan struct{}) { + t.lock.Lock() + defer t.lock.Unlock() + + // Should be a buffered channel so that Release() never blocks. + endTimerCh := make(chan struct{}, 1) + + walkInfo := mergeWalk{ + entryChs: resultChs, + endWalkCh: endWalkCh, + endTimerCh: endTimerCh, + } + + // Append new walk info. + t.pool[params] = append(t.pool[params], walkInfo) + + // Timer go-routine which times out after t.timeOut seconds. + go func(endTimerCh <-chan struct{}, walkInfo mergeWalk) { + select { + // Wait until timeOut + case <-time.After(t.timeOut): + // Timeout has expired. Remove the mergeWalk from mergeWalkPool and + // end the mergeWalk go-routine. + t.lock.Lock() + walks, ok := t.pool[params] + if ok { + // Trick of filtering without allocating + // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating + nwalks := walks[:0] + // Look for walkInfo, remove it from the walks list. + for _, walk := range walks { + if !reflect.DeepEqual(walk, walkInfo) { + nwalks = append(nwalks, walk) + } + } + if len(nwalks) == 0 { + // No more mergeWalk go-routines associated with listParams + // hence remove map entry. + delete(t.pool, params) + } else { + // There are more mergeWalk go-routines associated with listParams + // hence save the list in the map. + t.pool[params] = nwalks + } + } + // Signal the mergeWalk go-routine to die. + close(endWalkCh) + t.lock.Unlock() + case <-endTimerCh: + return + } + }(endTimerCh, walkInfo) +} diff --git a/cmd/merge-walk-pool_test.go b/cmd/merge-walk-pool_test.go new file mode 100644 index 000000000..def12291a --- /dev/null +++ b/cmd/merge-walk-pool_test.go @@ -0,0 +1,103 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "testing" + "time" +) + +// Test if tree walker go-routine is removed from the pool after timeout +// and that is available in the pool before the timeout. +func TestMergeWalkPoolBasic(t *testing.T) { + // Create a treeWalkPool + tw := NewMergeWalkPool(1 * time.Second) + + // Create sample params + params := listParams{ + bucket: "test-bucket", + } + + endWalkCh := make(chan struct{}) + // Add a treeWalk to the pool + tw.Set(params, []FileInfoCh{}, endWalkCh) + + // Wait for treeWalkPool timeout to happen + <-time.After(2 * time.Second) + if c1, _ := tw.Release(params); c1 != nil { + t.Error("treeWalk go-routine must have been freed") + } + + // Add the treeWalk back to the pool + endWalkCh = make(chan struct{}) + tw.Set(params, []FileInfoCh{}, endWalkCh) + + // Release the treeWalk before timeout + select { + case <-time.After(1 * time.Second): + break + default: + if c1, _ := tw.Release(params); c1 == nil { + t.Error("treeWalk go-routine got freed before timeout") + } + } +} + +// Test if multiple merge walkers for the same listParams are managed as expected by the pool. +func TestManyMergeWalksSameParam(t *testing.T) { + // Create a treeWalkPool. + tw := NewMergeWalkPool(5 * time.Second) + + // Create sample params. + params := listParams{ + bucket: "test-bucket", + } + + select { + // This timeout is an upper-bound. This is started + // before the first treeWalk go-routine's timeout period starts. + case <-time.After(5 * time.Second): + break + default: + // Create many treeWalk go-routines for the same params. + for i := 0; i < 10; i++ { + endWalkCh := make(chan struct{}) + walkChs := make([]FileInfoCh, 0) + tw.Set(params, walkChs, endWalkCh) + } + + tw.lock.Lock() + if walks, ok := tw.pool[params]; ok { + if len(walks) != 10 { + t.Error("There aren't as many walks as were Set") + } + } + tw.lock.Unlock() + for i := 0; i < 10; i++ { + tw.lock.Lock() + if walks, ok := tw.pool[params]; ok { + // Before ith Release we should have 10-i treeWalk go-routines. + if 10-i != len(walks) { + t.Error("There aren't as many walks as were Set") + } + } + tw.lock.Unlock() + tw.Release(params) + } + } + +} diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 6b20eba15..4a48c9791 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -111,6 +111,13 @@ func (d *naughtyDisk) DeleteVol(volume string) (err error) { return d.disk.DeleteVol(volume) } +func (d *naughtyDisk) Walk(volume, path, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) { + if err := d.calcError(); err != nil { + return nil, err + } + return d.disk.Walk(volume, path, marker, recursive, leafFile, readMetadataFn, endWalkCh) +} + func (d *naughtyDisk) ListDir(volume, path string, count int, leafFile string) (entries []string, err error) { if err := d.calcError(); err != nil { return []string{}, err diff --git a/cmd/posix.go b/cmd/posix.go index 5ee157a2c..4be0d1447 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2016, 2017, 2018 MinIO, Inc. + * MinIO Cloud Storage, (C) 2016-2019 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import ( slashpath "path" "path/filepath" "runtime" + "sort" "strings" "sync" "sync/atomic" @@ -45,7 +46,9 @@ const ( diskMinTotalSpace = diskMinFreeSpace // Min 900MiB total space. maxAllowedIOError = 5 posixWriteBlockSize = 4 * humanize.MiByte - directioAlignSize = 4096 // DirectIO alignment needs to be 4K. Defined here as directio.AlignSize is defined as 0 in MacOS causing divide by 0 error. + // DirectIO alignment needs to be 4K. Defined here as + // directio.AlignSize is defined as 0 in MacOS causing divide by 0 error. + directioAlignSize = 4096 ) // isValidVolname verifies a volname name in accordance with object @@ -642,6 +645,85 @@ func (s *posix) DeleteVol(volume string) (err error) { return nil } +// Walk - is a sorted walker which returns file entries in lexically +// sorted order, additionally along with metadata about each of those entries. +func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile string, + readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (ch chan FileInfo, err error) { + + defer func() { + if err == errFaultyDisk { + atomic.AddInt32(&s.ioErrCount, 1) + } + }() + + if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError { + return nil, errFaultyDisk + } + + if err = s.checkDiskFound(); err != nil { + return nil, err + } + + // Verify if volume is valid and it exists. + volumeDir, err := s.getVolDir(volume) + if err != nil { + return nil, err + } + + // Stat a volume entry. + _, err = os.Stat(volumeDir) + if err != nil { + if os.IsNotExist(err) { + return nil, errVolumeNotFound + } else if isSysErrIO(err) { + return nil, errFaultyDisk + } + return nil, err + } + + ch = make(chan FileInfo) + go func() { + defer close(ch) + listDir := func(volume, dirPath, dirEntry string) (entries []string) { + entries, err := s.ListDir(volume, dirPath, -1, leafFile) + if err != nil { + return + } + sort.Strings(entries) + return filterMatchingPrefix(entries, dirEntry) + } + + walkResultCh := startTreeWalk(context.Background(), volume, dirPath, marker, recursive, listDir, endWalkCh) + for { + walkResult, ok := <-walkResultCh + if !ok { + return + } + var fi FileInfo + if hasSuffix(walkResult.entry, slashSeparator) { + fi = FileInfo{ + Volume: volume, + Name: walkResult.entry, + Mode: os.ModeDir, + } + } else { + buf, err := s.ReadAll(volume, pathJoin(walkResult.entry, leafFile)) + if err != nil { + continue + } + fi = readMetadataFn(buf, volume, walkResult.entry) + } + select { + case ch <- fi: + case <-endWalkCh: + return + } + } + }() + + return ch, nil +} + // ListDir - return all the entries at the given directory path. // If an entry is a directory it will be returned with a trailing "/". func (s *posix) ListDir(volume, dirPath string, count int, leafFile string) (entries []string, err error) { diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index a2bdb760e..cfa326a28 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -30,6 +30,13 @@ type VolInfo struct { Created time.Time } +// FilesInfo represent a list of files, additionally +// indicates if the list is last. +type FilesInfo struct { + Files []FileInfo + IsTruncated bool +} + // FileInfo - represents file stat information. type FileInfo struct { // Name of the volume. @@ -46,4 +53,12 @@ type FileInfo struct { // File mode bits. Mode os.FileMode + + // File metadata + Metadata map[string]string + + // All the parts per object. + Parts []ObjectPartInfo + + Quorum int } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 55dc66362..0502c6952 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -38,6 +38,10 @@ type StorageAPI interface { StatVol(volume string) (vol VolInfo, err error) DeleteVol(volume string) (err error) + // Walk in sorted order directly on disk. + Walk(volume, dirPath string, marker string, recursive bool, leafFile string, + readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) + // File operations. ListDir(volume, dirPath string, count int, leafFile string) ([]string, error) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 48d6721ff..ede29de94 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -330,6 +330,43 @@ func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buf return int64(n), err } +func (client *storageRESTClient) Walk(volume, dirPath, marker string, recursive bool, leafFile string, + readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTDirPath, dirPath) + values.Set(storageRESTMarkerPath, marker) + values.Set(storageRESTRecursive, strconv.FormatBool(recursive)) + values.Set(storageRESTLeafFile, leafFile) + respBody, err := client.call(storageRESTMethodWalk, values, nil, -1) + if err != nil { + return nil, err + } + + ch := make(chan FileInfo) + go func() { + defer close(ch) + defer http.DrainBody(respBody) + + decoder := gob.NewDecoder(respBody) + for { + var fi FileInfo + if gerr := decoder.Decode(&fi); gerr != nil { + // Upon error return + return + } + select { + case ch <- fi: + case <-endWalkCh: + return + } + + } + }() + + return ch, nil +} + // ListDir - lists a directory. func (client *storageRESTClient) ListDir(volume, dirPath string, count int, leafFile string) (entries []string, err error) { values := make(url.Values) diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 7e087f42b..ca64d4a93 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -16,7 +16,7 @@ package cmd -const storageRESTVersion = "v5" +const storageRESTVersion = "v6" const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + "/" const ( @@ -34,6 +34,7 @@ const ( storageRESTMethodReadFile = "readfile" storageRESTMethodReadFileStream = "readfilestream" storageRESTMethodListDir = "listdir" + storageRESTMethodWalk = "walk" storageRESTMethodDeleteFile = "deletefile" storageRESTMethodDeleteFileBulk = "deletefilebulk" storageRESTMethodRenameFile = "renamefile" @@ -51,7 +52,9 @@ const ( storageRESTOffset = "offset" storageRESTLength = "length" storageRESTCount = "count" + storageRESTMarkerPath = "marker" storageRESTLeafFile = "leaf-file" + storageRESTRecursive = "recursive" storageRESTBitrotAlgo = "bitrot-algo" storageRESTBitrotHash = "bitrot-hash" storageRESTInstanceID = "instance-id" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 702166ca8..797c34875 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -17,6 +17,7 @@ package cmd import ( + "context" "encoding/gob" "encoding/hex" "errors" @@ -369,6 +370,57 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http w.(http.Flusher).Flush() } +// readMetadata func provides the function types for reading leaf metadata. +type readMetadataFunc func(buf []byte, volume, entry string) FileInfo + +func readMetadata(buf []byte, volume, entry string) FileInfo { + m, err := xlMetaV1UnmarshalJSON(context.Background(), buf) + if err != nil { + return FileInfo{} + } + return FileInfo{ + Volume: volume, + Name: entry, + ModTime: m.Stat.ModTime, + Size: m.Stat.Size, + Metadata: m.Meta, + Parts: m.Parts, + Quorum: m.Erasure.DataBlocks, + } +} + +// WalkHandler - remote caller to start walking at a requested directory path. +func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + dirPath := vars[storageRESTDirPath] + markerPath := vars[storageRESTMarkerPath] + recursive, err := strconv.ParseBool(vars[storageRESTRecursive]) + if err != nil { + s.writeErrorResponse(w, err) + return + } + leafFile := vars[storageRESTLeafFile] + + endWalkCh := make(chan struct{}) + defer close(endWalkCh) + + fch, err := s.storage.Walk(volume, dirPath, markerPath, recursive, leafFile, readMetadata, endWalkCh) + if err != nil { + s.writeErrorResponse(w, err) + return + } + defer w.(http.Flusher).Flush() + + encoder := gob.NewEncoder(w) + for fi := range fch { + encoder.Encode(&fi) + } +} + // ListDirHandler - list a directory. func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -479,6 +531,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) { Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)). Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount, storageRESTLeafFile)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodWalk).HandlerFunc(httpTraceHdrs(server.WalkHandler)). + Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive, storageRESTLeafFile)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)). diff --git a/cmd/tree-walk-pool.go b/cmd/tree-walk-pool.go index 7ed4cc821..df09a5dec 100644 --- a/cmd/tree-walk-pool.go +++ b/cmd/tree-walk-pool.go @@ -18,6 +18,7 @@ package cmd import ( "errors" + "reflect" "sync" "time" ) @@ -127,20 +128,23 @@ func (t TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endWa t.lock.Lock() walks, ok := t.pool[params] if ok { + // Trick of filtering without allocating + // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating + nwalks := walks[:0] // Look for walkInfo, remove it from the walks list. - for i, walk := range walks { - if walk == walkInfo { - walks = append(walks[:i], walks[i+1:]...) + for _, walk := range walks { + if !reflect.DeepEqual(walk, walkInfo) { + nwalks = append(nwalks, walk) } } - if len(walks) == 0 { + if len(nwalks) == 0 { // No more treeWalk go-routines associated with listParams // hence remove map entry. delete(t.pool, params) } else { // There are more treeWalk go-routines associated with listParams // hence save the list in the map. - t.pool[params] = walks + t.pool[params] = nwalks } } // Signal the treeWalk go-routine to die. diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index b45cc13e0..886269d87 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -23,7 +23,6 @@ import ( "io" "net/http" "sort" - "strings" "sync" "time" @@ -76,8 +75,8 @@ type xlSets struct { // Distribution algorithm of choice. distributionAlgo string - // Pack level listObjects pool management. - listPool *TreeWalkPool + // Merge tree walk + pool *MergeWalkPool } // isConnected - checks if the endpoint is connected or not. @@ -270,7 +269,7 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP format: format, disksConnectDoneCh: make(chan struct{}), distributionAlgo: format.XL.DistributionAlgo, - listPool: NewTreeWalkPool(globalLookupTimeout), + pool: NewMergeWalkPool(globalMergeLookupTimeout), } mutex := newNSLock(globalIsDistXL) @@ -698,7 +697,6 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke } // Returns function "listDir" of the type listDirFunc. -// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. // disks - used for doing disk.ListDir(). Sets passes set of disks. func listDirSetsFactory(ctx context.Context, sets ...*xlObjects) ListDirFunc { listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) { @@ -765,23 +763,240 @@ func listDirSetsFactory(ctx context.Context, sets ...*xlObjects) ListDirFunc { return listDir } -// ListObjects - implements listing of objects across sets, each set is independently -// listed and subsequently merge lexically sorted inside listDirSetsFactory(). Resulting -// value through the walk channel receives the data properly lexically sorted. -func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { - listDir := listDirSetsFactory(ctx, s.sets...) +// FileInfoCh - file info channel +type FileInfoCh struct { + Ch chan FileInfo + Prev FileInfo + Valid bool +} + +// Pop - pops a cached entry if any, or from the cached channel. +func (f *FileInfoCh) Pop() (fi FileInfo, ok bool) { + if f.Valid { + f.Valid = false + return f.Prev, true + } // No cached entries found, read from channel + f.Prev, ok = <-f.Ch + return f.Prev, ok +} + +// Push - cache an entry, for Pop() later. +func (f *FileInfoCh) Push(fi FileInfo) { + f.Prev = fi + f.Valid = true +} + +// Calculate least entry across multiple FileInfo channels, additionally +// returns a boolean to indicate if the caller needs to call again. +func leastEntry(entriesCh []FileInfoCh, readQuorum int) (FileInfo, bool) { + var entriesValid = make([]bool, len(entriesCh)) + var entries = make([]FileInfo, len(entriesCh)) + for i := range entriesCh { + entries[i], entriesValid[i] = entriesCh[i].Pop() + } + + var isTruncated = false + for _, valid := range entriesValid { + if !valid { + continue + } + isTruncated = true + break + } + + var lentry FileInfo + var found bool + for i, valid := range entriesValid { + if !valid { + continue + } + if !found { + lentry = entries[i] + found = true + continue + } + if entries[i].Name < lentry.Name { + lentry = entries[i] + } + } + + // We haven't been able to find any least entry, + // this would mean that we don't have valid. + if !found { + return lentry, isTruncated + } + + leastEntryCount := 0 + for i, valid := range entriesValid { + if !valid { + continue + } + + // Entries are duplicated across disks, + // we should simply skip such entries. + if lentry.Name == entries[i].Name && lentry.ModTime.Equal(entries[i].ModTime) { + leastEntryCount++ + continue + } + + // Push all entries which are lexically higher + // and will be returned later in Pop() + entriesCh[i].Push(entries[i]) + } + + quorum := lentry.Quorum + if quorum == 0 { + quorum = readQuorum + } + + if leastEntryCount >= quorum { + return lentry, isTruncated + } + + return leastEntry(entriesCh, readQuorum) +} + +// mergeEntriesCh - merges FileInfo channel to entries upto maxKeys. +func mergeEntriesCh(entriesCh []FileInfoCh, maxKeys int, readQuorum int) (entries FilesInfo) { + for i := 0; i < maxKeys; { + var fi FileInfo + fi, entries.IsTruncated = leastEntry(entriesCh, readQuorum) + if !entries.IsTruncated { + break + } + entries.Files = append(entries.Files, fi) + i++ + } + return entries +} - var getObjectInfoDirs []func(context.Context, string, string) (ObjectInfo, error) - // Verify prefixes in all sets. +// Starts a walk channel across all disks and returns a slice. +func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh chan struct{}) []FileInfoCh { + var entryChs []FileInfoCh for _, set := range s.sets { - getObjectInfoDirs = append(getObjectInfoDirs, set.getObjectInfoDir) + for _, disk := range set.getDisks() { + if disk == nil { + // Disk can be offline + continue + } + entryCh, err := disk.Walk(bucket, prefix, marker, recursive, xlMetaJSONFile, readMetadata, endWalkCh) + if err != nil { + // Disk walk returned error, ignore it. + continue + } + entryChs = append(entryChs, FileInfoCh{ + Ch: entryCh, + }) + } + } + return entryChs +} + +// ListObjects - implements listing of objects across disks, each disk is indepenently +// walked and merged at this layer. Resulting value through the merge process sends +// the data in lexically sorted order. +func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { + if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil { + return loi, err + } + + // Marker is set validate pre-condition. + if marker != "" { + // Marker not common with prefix is not implemented. Send an empty response + if !hasPrefix(marker, prefix) { + return loi, nil + } + } + + // With max keys of zero we have reached eof, return right here. + if maxKeys == 0 { + return loi, nil + } + + // For delimiter and prefix as '/' we do not list anything at all + // since according to s3 spec we stop at the 'delimiter' + // along // with the prefix. On a flat namespace with 'prefix' + // as '/' we don't have any entries, since all the keys are + // of form 'keyName/...' + if delimiter == slashSeparator && prefix == slashSeparator { + return loi, nil + } + + // Over flowing count - reset to maxObjectList. + if maxKeys < 0 || maxKeys > maxObjectList { + maxKeys = maxObjectList } - var getObjectInfo = func(ctx context.Context, bucket string, entry string) (ObjectInfo, error) { - return s.getHashedSet(entry).getObjectInfo(ctx, bucket, entry) + // Default is recursive, if delimiter is set then list non recursive. + recursive := true + if delimiter == slashSeparator { + recursive = false + } + + entryChs, endWalkCh := s.pool.Release(listParams{bucket, recursive, marker, prefix}) + if entryChs == nil { + endWalkCh = make(chan struct{}) + entryChs = s.startMergeWalks(context.Background(), bucket, prefix, marker, recursive, endWalkCh) + } + + entries := mergeEntriesCh(entryChs, maxKeys, s.drivesPerSet/2) + if len(entries.Files) == 0 { + return loi, nil } - return listObjects(ctx, s, bucket, prefix, marker, delimiter, maxKeys, s.listPool, listDir, getObjectInfo, getObjectInfoDirs...) + loi.Objects = make([]ObjectInfo, len(entries.Files)) + loi.IsTruncated = entries.IsTruncated + if loi.IsTruncated { + loi.NextMarker = entries.Files[len(entries.Files)-1].Name + } + + for _, entry := range entries.Files { + var objInfo ObjectInfo + if hasSuffix(entry.Name, slashSeparator) { + if !recursive { + loi.Prefixes = append(loi.Prefixes, entry.Name) + continue + } + objInfo = ObjectInfo{ + Bucket: bucket, + Name: entry.Name, + IsDir: true, + } + } else { + objInfo = ObjectInfo{ + IsDir: false, + Bucket: bucket, + Name: entry.Name, + ModTime: entry.ModTime, + Size: entry.Size, + ContentType: entry.Metadata["content-type"], + ContentEncoding: entry.Metadata["content-encoding"], + } + + // Extract etag from metadata. + objInfo.ETag = extractETag(entry.Metadata) + + // All the parts per object. + objInfo.Parts = entry.Parts + + // etag/md5Sum has already been extracted. We need to + // remove to avoid it from appearing as part of + // response headers. e.g, X-Minio-* or X-Amz-*. + objInfo.UserDefined = cleanMetadata(entry.Metadata) + + // Update storage class + if sc, ok := entry.Metadata[amzStorageClass]; ok { + objInfo.StorageClass = sc + } else { + objInfo.StorageClass = globalMinioDefaultStorageClass + } + } + loi.Objects = append(loi.Objects, objInfo) + } + if loi.IsTruncated { + s.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix}, entryChs, endWalkCh) + } + return loi, nil } func (s *xlSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) { @@ -1301,7 +1516,7 @@ func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObj if !ok { break } - if err := healObjectFn(bucket, strings.TrimSuffix(walkResult.entry, slashSeparator+xlMetaJSONFile)); err != nil { + if err := healObjectFn(bucket, walkResult.entry); err != nil { return toObjectErr(err, bucket, walkResult.entry) } if walkResult.end {