diff --git a/pkg/fs/definitions.go b/pkg/fs/definitions.go index 93924cdc5..96c31a4ef 100644 --- a/pkg/fs/definitions.go +++ b/pkg/fs/definitions.go @@ -126,24 +126,6 @@ type ListObjectsResult struct { Prefixes []string } -type listObjectsReq struct { - Bucket string - Prefix string - Marker string - Delimiter string - MaxKeys int -} - -type listServiceReq struct { - req listObjectsReq - respCh chan ListObjectsResult -} - -type listWorkerReq struct { - req listObjectsReq - respCh chan ListObjectsResult -} - // CompletePart - completed part container type CompletePart struct { PartNumber int diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index 54a5cd01d..6a3e98dcd 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -18,6 +18,7 @@ package fs import ( "errors" + "hash/fnv" "os" "path/filepath" "strings" @@ -26,38 +27,59 @@ import ( "github.com/minio/minio-xl/pkg/probe" ) -func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, *probe.Error) { - bucket := startReq.Bucket - prefix := startReq.Prefix - marker := startReq.Marker - delimiter := startReq.Delimiter +type listObjectsParams struct { + Bucket string + Prefix string + Marker string + Delimiter string + MaxKeys int +} + +type listServiceReq struct { + reqParams listObjectsParams + respCh chan ListObjectsResult +} + +type listWorkerReq struct { + respCh chan ListObjectsResult +} + +// listObjects - list objects lists objects upto maxKeys for a given +// prefix. +func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (chan<- listWorkerReq, *probe.Error) { quitWalker := make(chan bool) reqCh := make(chan listWorkerReq) walkerCh := make(chan ObjectMetadata) go func() { - var rootPath string + defer close(walkerCh) + var walkPath string bucketPath := filepath.Join(fs.path, bucket) - trimBucketPathPrefix := bucketPath + string(os.PathSeparator) - prefixPath := trimBucketPathPrefix + prefix + // Bucket path prefix should always end with a separator. + bucketPathPrefix := bucketPath + string(os.PathSeparator) + prefixPath := bucketPathPrefix + prefix st, err := os.Stat(prefixPath) if err != nil && os.IsNotExist(err) { - rootPath = bucketPath + walkPath = bucketPath } else { if st.IsDir() && !strings.HasSuffix(prefix, delimiter) { - rootPath = bucketPath + walkPath = bucketPath } else { - rootPath = prefixPath + walkPath = prefixPath } } - filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { - if path == rootPath { + filepath.Walk(walkPath, func(path string, info os.FileInfo, err error) error { + // We don't need to list the walk path. + if path == walkPath { return nil } + // For all incoming directories add a ending separator. if info.IsDir() { path = path + string(os.PathSeparator) } - objectName := strings.TrimPrefix(path, trimBucketPathPrefix) + // Extract object name. + objectName := strings.TrimPrefix(path, bucketPathPrefix) if strings.HasPrefix(objectName, prefix) { + // For objectName lesser than marker, ignore. if marker >= objectName { return nil } @@ -68,52 +90,56 @@ func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, Size: info.Size(), } select { + // Send object on walker channel. case walkerCh <- object: - // Do nothing case <-quitWalker: - // Returning error ends the Walk() - return errors.New("Ending") + // Returning error ends the file tree Walk(). + return errors.New("Quit list worker.") } + // If delimiter is set, we stop if current path is a + // directory. if delimiter != "" && info.IsDir() { return filepath.SkipDir } } return nil }) - close(walkerCh) }() + go func() { - resp := ListObjectsResult{} for { select { + // Timeout after 10 seconds if request did not arrive for + // the given list parameters. case <-time.After(10 * time.Second): - quitWalker <- true - timeoutReq := listObjectsReq{bucket, prefix, marker, delimiter, 0} - fs.timeoutReqCh <- timeoutReq - // FIXME: can there be a race such that sender on reqCh panics? + quitWalker <- true // Quit file path walk if running. + // Send back the hash for this request. + fs.timeoutReqCh <- fnvSum(bucket, prefix, marker, delimiter) return case req, ok := <-reqCh: if !ok { + // If the request channel is closed, no more + // requests return here. return } - resp = ListObjectsResult{} - resp.Objects = make([]ObjectMetadata, 0) - resp.Prefixes = make([]string, 0) - count := 0 + resp := ListObjectsResult{} + var count int for object := range walkerCh { - if count == req.req.MaxKeys { + if count == maxKeys { resp.IsTruncated = true break } + // If object is a directory. if object.Mode.IsDir() { if delimiter == "" { - // Skip directories for recursive list + // Skip directories for recursive listing. continue } resp.Prefixes = append(resp.Prefixes, object.Object) } else { resp.Objects = append(resp.Objects, object) } + // Set the next marker for the next request. resp.NextMarker = object.Object count++ } @@ -124,59 +150,88 @@ func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, return reqCh, nil } -func (fs *Filesystem) startListService() *probe.Error { - listServiceReqCh := make(chan listServiceReq) - timeoutReqCh := make(chan listObjectsReq) - reqToListWorkerReqCh := make(map[string](chan<- listWorkerReq)) - reqToStr := func(bucket string, prefix string, marker string, delimiter string) string { - return strings.Join([]string{bucket, prefix, marker, delimiter}, ":") +// fnvSum calculates a hash for concatenation of all input strings. +func fnvSum(elements ...string) uint32 { + fnvHash := fnv.New32a() + for _, element := range elements { + fnvHash.Write([]byte(element)) } + return fnvHash.Sum32() +} + +// listObjectsService - list objects service manages various incoming +// list object requests by delegating them to an existing listObjects +// routine or initializes a new listObjects routine. +func (fs *Filesystem) listObjectsService() *probe.Error { + // Initialize list service request channel. + listServiceReqCh := make(chan listServiceReq) + fs.listServiceReqCh = listServiceReqCh + + // Initialize timeout request channel to receive request hashes of + // timed-out requests. + timeoutReqCh := make(chan uint32) + fs.timeoutReqCh = timeoutReqCh + + // Initialize request hash to list worker map. + reqToListWorkerReqCh := make(map[uint32]chan<- listWorkerReq) + + // Start service in a go routine. go func() { for { select { - case timeoutReq := <-timeoutReqCh: - reqStr := reqToStr(timeoutReq.Bucket, timeoutReq.Prefix, timeoutReq.Marker, timeoutReq.Delimiter) - listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] + case reqHash := <-timeoutReqCh: + // For requests which have timed-out, close the worker + // channels proactively, this may happen for idle + // workers once in 10seconds. + listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash] if ok { close(listWorkerReqCh) } - delete(reqToListWorkerReqCh, reqStr) - case serviceReq := <-listServiceReqCh: - reqStr := reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, serviceReq.req.Marker, serviceReq.req.Delimiter) - listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] + delete(reqToListWorkerReqCh, reqHash) + case srvReq := <-listServiceReqCh: + // Save the params for readability. + bucket := srvReq.reqParams.Bucket + prefix := srvReq.reqParams.Prefix + marker := srvReq.reqParams.Marker + delimiter := srvReq.reqParams.Delimiter + maxKeys := srvReq.reqParams.MaxKeys + + // Generate hash. + reqHash := fnvSum(bucket, prefix, marker, delimiter) + listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash] if !ok { var err *probe.Error - listWorkerReqCh, err = fs.listWorker(serviceReq.req) + listWorkerReqCh, err = fs.listObjects(bucket, prefix, marker, delimiter, maxKeys) if err != nil { - serviceReq.respCh <- ListObjectsResult{} + srvReq.respCh <- ListObjectsResult{} return } - reqToListWorkerReqCh[reqStr] = listWorkerReqCh + reqToListWorkerReqCh[reqHash] = listWorkerReqCh } respCh := make(chan ListObjectsResult) - listWorkerReqCh <- listWorkerReq{serviceReq.req, respCh} + listWorkerReqCh <- listWorkerReq{respCh} resp, ok := <-respCh if !ok { - serviceReq.respCh <- ListObjectsResult{} + srvReq.respCh <- ListObjectsResult{} return } - delete(reqToListWorkerReqCh, reqStr) + delete(reqToListWorkerReqCh, reqHash) if !resp.IsTruncated { close(listWorkerReqCh) } else { - reqStr = reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, resp.NextMarker, serviceReq.req.Delimiter) - reqToListWorkerReqCh[reqStr] = listWorkerReqCh + nextMarker := resp.NextMarker + reqHash = fnvSum(bucket, prefix, nextMarker, delimiter) + reqToListWorkerReqCh[reqHash] = listWorkerReqCh } - serviceReq.respCh <- resp + srvReq.respCh <- resp } } }() - fs.timeoutReqCh = timeoutReqCh - fs.listServiceReqCh = listServiceReqCh return nil } -// ListObjects - +// ListObjects - lists all objects for a given prefix, returns upto +// maxKeys number of objects per call. func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() @@ -195,24 +250,24 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe return ListObjectsResult{}, probe.NewError(e) } - req := listObjectsReq{} - req.Bucket = bucket - req.Prefix = filepath.FromSlash(prefix) - req.Marker = filepath.FromSlash(marker) - req.Delimiter = filepath.FromSlash(delimiter) - req.MaxKeys = maxKeys + reqParams := listObjectsParams{} + reqParams.Bucket = bucket + reqParams.Prefix = filepath.FromSlash(prefix) + reqParams.Marker = filepath.FromSlash(marker) + reqParams.Delimiter = filepath.FromSlash(delimiter) + reqParams.MaxKeys = maxKeys respCh := make(chan ListObjectsResult) - fs.listServiceReqCh <- listServiceReq{req, respCh} + fs.listServiceReqCh <- listServiceReq{reqParams, respCh} resp := <-respCh - for i := 0; i < len(resp.Prefixes); i++ { + for i := range resp.Prefixes { resp.Prefixes[i] = filepath.ToSlash(resp.Prefixes[i]) } - for i := 0; i < len(resp.Objects); i++ { + for i := range resp.Objects { resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object) } - if req.Delimiter == "" { + if reqParams.Delimiter == "" { // This element is set only if you have delimiter set. // If response does not include the NextMaker and it is // truncated, you can use the value of the last Key in the diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 39f6a44a1..99248299c 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -34,7 +34,7 @@ type Filesystem struct { multiparts *Multiparts buckets *Buckets listServiceReqCh chan<- listServiceReq - timeoutReqCh chan<- listObjectsReq + timeoutReqCh chan<- uint32 } // Buckets holds acl information @@ -106,8 +106,7 @@ func New(rootPath string) (Filesystem, *probe.Error) { fs.minFreeDisk = 10 // Start list goroutine. - err = fs.startListService() - if err != nil { + if err = fs.listObjectsService(); err != nil { return Filesystem{}, err.Trace(rootPath) } // Return here.