From 9e18bfa60e99eac3caaadacc686ab783a93a4059 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Tue, 10 Nov 2015 03:10:11 -0800 Subject: [PATCH 1/6] listObjects: Channel based ftw - initial implementation. --- api-response.go | 21 +- bucket-handlers.go | 12 +- pkg/fs/definitions.go | 25 ++ pkg/fs/fs-bucket-listobjects.go | 503 +++++++++++--------------------- pkg/fs/fs.go | 27 +- 5 files changed, 239 insertions(+), 349 deletions(-) diff --git a/api-response.go b/api-response.go index d013d168a..c3dfb0bfd 100644 --- a/api-response.go +++ b/api-response.go @@ -92,7 +92,8 @@ func generateAccessControlPolicyResponse(acl fs.BucketACL) AccessControlPolicyRe } // generates an ListObjects response for the said bucket with other enumerated options. -func generateListObjectsResponse(bucket string, objects []fs.ObjectMetadata, bucketResources fs.BucketResourcesMetadata) ListObjectsResponse { +// func generateListObjectsResponse(bucket string, objects []fs.ObjectMetadata, bucketResources fs.BucketResourcesMetadata) ListObjectsResponse { +func generateListObjectsResponse(bucket string, req fs.ListObjectsReq, resp fs.ListObjectsResp) ListObjectsResponse { var contents []*Object var prefixes []*CommonPrefix var owner = Owner{} @@ -101,7 +102,7 @@ func generateListObjectsResponse(bucket string, objects []fs.ObjectMetadata, buc owner.ID = "minio" owner.DisplayName = "minio" - for _, object := range objects { + for _, object := range resp.Objects { var content = &Object{} if object.Object == "" { continue @@ -117,13 +118,15 @@ func generateListObjectsResponse(bucket string, objects []fs.ObjectMetadata, buc // TODO - support EncodingType in xml decoding data.Name = bucket data.Contents = contents - data.MaxKeys = bucketResources.Maxkeys - data.Prefix = bucketResources.Prefix - data.Delimiter = bucketResources.Delimiter - data.Marker = bucketResources.Marker - data.NextMarker = bucketResources.NextMarker - data.IsTruncated = bucketResources.IsTruncated - for _, prefix := range bucketResources.CommonPrefixes { + + data.MaxKeys = req.MaxKeys + data.Prefix = req.Prefix + data.Delimiter = req.Delimiter + data.Marker = req.Marker + + data.NextMarker = resp.NextMarker + data.IsTruncated = resp.IsTruncated + for _, prefix := range resp.Prefixes { var prefixItem = &CommonPrefix{} prefixItem.Prefix = prefix prefixes = append(prefixes, prefixItem) diff --git a/bucket-handlers.go b/bucket-handlers.go index c2f0d74a1..146143d70 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -137,10 +137,16 @@ func (api CloudStorageAPI) ListObjectsHandler(w http.ResponseWriter, req *http.R resources.Maxkeys = maxObjectList } - objects, resources, err := api.Filesystem.ListObjects(bucket, resources) + listReq := fs.ListObjectsReq{ + Prefix: resources.Prefix, + Marker: resources.Marker, + Delimiter: resources.Delimiter, + MaxKeys: resources.Maxkeys, + } + listResp, err := api.Filesystem.ListObjects(bucket, listReq) if err == nil { - // Generate response - response := generateListObjectsResponse(bucket, objects, resources) + // generate response + response := generateListObjectsResponse(bucket, listReq, listResp) encodedSuccessResponse := encodeSuccessResponse(response) // Write headers setCommonHeaders(w) diff --git a/pkg/fs/definitions.go b/pkg/fs/definitions.go index bd8715056..6c656167f 100644 --- a/pkg/fs/definitions.go +++ b/pkg/fs/definitions.go @@ -130,6 +130,31 @@ type BucketResourcesMetadata struct { CommonPrefixes []string } +type ListObjectsReq struct { + Bucket string + Prefix string + Marker string + Delimiter string + MaxKeys int +} + +type ListObjectsResp struct { + IsTruncated bool + NextMarker string + Objects []ObjectMetadata + Prefixes []string +} + +type listServiceReq struct { + req ListObjectsReq + respCh chan ListObjectsResp +} + +type listWorkerReq struct { + req ListObjectsReq + respCh chan ListObjectsResp +} + // CompletePart - completed part container type CompletePart struct { PartNumber int diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index 825d5828a..041067727 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -17,363 +17,214 @@ package fs import ( - "io/ioutil" + "errors" + "fmt" "os" "path/filepath" - "runtime" - "sort" "strings" + "time" "github.com/minio/minio-xl/pkg/probe" ) -// ListObjects - GET bucket (list objects) -func (fs Filesystem) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() - if !IsValidBucketName(bucket) { - return nil, resources, probe.NewError(BucketNameInvalid{Bucket: bucket}) - } - if resources.Prefix != "" && IsValidObjectName(resources.Prefix) == false { - return nil, resources, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: resources.Prefix}) - } - - bucket = fs.denormalizeBucket(bucket) - - p := bucketDir{} - rootPrefix := filepath.Join(fs.path, bucket) - // check bucket exists - if _, err := os.Stat(rootPrefix); os.IsNotExist(err) { - return nil, resources, probe.NewError(BucketNotFound{Bucket: bucket}) - } - - p.root = rootPrefix - /// automatically treat incoming "/" as "\\" on windows due to its path constraints. - if runtime.GOOS == "windows" { - if resources.Prefix != "" { - resources.Prefix = strings.Replace(resources.Prefix, "/", string(os.PathSeparator), -1) - } - if resources.Delimiter != "" { - resources.Delimiter = strings.Replace(resources.Delimiter, "/", string(os.PathSeparator), -1) - } - if resources.Marker != "" { - resources.Marker = strings.Replace(resources.Marker, "/", string(os.PathSeparator), -1) - } +func (fs Filesystem) listWorker(startReq ListObjectsReq) (chan<- listWorkerReq, *probe.Error) { + Separator := string(os.PathSeparator) + bucket := startReq.Bucket + prefix := startReq.Prefix + marker := startReq.Marker + delimiter := startReq.Delimiter + quit := make(chan bool) + if marker != "" { + return nil, probe.NewError(errors.New("Not supported")) } - - // if delimiter is supplied and not prefix then we are the very top level, list everything and move on. - if resources.Delimiter != "" && resources.Prefix == "" { - files, err := ioutil.ReadDir(rootPrefix) - if err != nil { - if os.IsNotExist(err) { - return nil, resources, probe.NewError(BucketNotFound{Bucket: bucket}) - } - return nil, resources, probe.NewError(err) - } - for _, fl := range files { - if strings.HasSuffix(fl.Name(), "$multiparts") { - continue - } - p.files = append(p.files, contentInfo{ - Prefix: fl.Name(), - Size: fl.Size(), - Mode: fl.Mode(), - ModTime: fl.ModTime(), - FileInfo: fl, - }) - } + if delimiter != "" && delimiter != Separator { + return nil, probe.NewError(errors.New("Not supported")) } - - // If delimiter and prefix is supplied make sure that paging doesn't go deep, treat it as simple directory listing. - if resources.Delimiter != "" && resources.Prefix != "" { - if !strings.HasSuffix(resources.Prefix, resources.Delimiter) { - fl, err := os.Stat(filepath.Join(rootPrefix, resources.Prefix)) - if err != nil { - if os.IsNotExist(err) { - return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix}) - } - return nil, resources, probe.NewError(err) - } - p.files = append(p.files, contentInfo{ - Prefix: resources.Prefix, - Size: fl.Size(), - Mode: os.ModeDir, - ModTime: fl.ModTime(), - FileInfo: fl, - }) - } else { - var prefixPath string - if runtime.GOOS == "windows" { - prefixPath = rootPrefix + string(os.PathSeparator) + resources.Prefix - } else { - prefixPath = rootPrefix + string(os.PathSeparator) + resources.Prefix - } - files, err := ioutil.ReadDir(prefixPath) - if err != nil { - switch err := err.(type) { - case *os.PathError: - if err.Op == "open" { - return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix}) - } - } - return nil, resources, probe.NewError(err) - } - for _, fl := range files { - if strings.HasSuffix(fl.Name(), "$multiparts") { - continue - } - prefix := fl.Name() - if resources.Prefix != "" { - prefix = filepath.Join(resources.Prefix, fl.Name()) - } - p.files = append(p.files, contentInfo{ - Prefix: prefix, - Size: fl.Size(), - Mode: fl.Mode(), - ModTime: fl.ModTime(), - FileInfo: fl, - }) - } - } - } - if resources.Delimiter == "" { - var files []contentInfo - getAllFiles := func(fp string, fl os.FileInfo, err error) error { - // If any error return back quickly - if err != nil { - return err - } - if strings.HasSuffix(fp, "$multiparts") { + reqCh := make(chan listWorkerReq) + walkerCh := make(chan ObjectMetadata) + go func() { + rootPath := filepath.Join(fs.path, bucket, prefix) + stripPath := filepath.Join(fs.path, bucket) + Separator + filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { + if path == rootPath { return nil } - // if file pointer equals to rootPrefix - discard it - if fp == p.root { - return nil + if info.IsDir() { + path = path + Separator } - if len(files) > resources.Maxkeys { - return ErrSkipFile + objectName := strings.TrimPrefix(path, stripPath) + object := ObjectMetadata{ + Object: objectName, + Created: info.ModTime(), + Mode: info.Mode(), + Size: info.Size(), } - // Split the root prefix from the incoming file pointer - realFp := "" - if runtime.GOOS == "windows" { - if splits := strings.Split(fp, (p.root + string(os.PathSeparator))); len(splits) > 1 { - realFp = splits[1] - } - } else { - if splits := strings.Split(fp, (p.root + string(os.PathSeparator))); len(splits) > 1 { - realFp = splits[1] - } + select { + case walkerCh <- object: + // do nothings + case <-quit: + fmt.Println("walker got quit") + // returning error ends the Walk() + return errors.New("Ending") } - // If path is a directory and has a prefix verify if the file pointer - // has the prefix if it does not skip the directory. - if fl.Mode().IsDir() { - if resources.Prefix != "" { - // Skip the directory on following situations - // - when prefix is part of file pointer along with the root path - // - when file pointer is part of the prefix along with root path - if !strings.HasPrefix(fp, filepath.Join(p.root, resources.Prefix)) && - !strings.HasPrefix(filepath.Join(p.root, resources.Prefix), fp) { - return ErrSkipDir - } - } - } - // If path is a directory and has a marker verify if the file split file pointer - // is lesser than the Marker top level directory if yes skip it. - if fl.Mode().IsDir() { - if resources.Marker != "" { - if realFp != "" { - // For windows split with its own os.PathSeparator - if runtime.GOOS == "windows" { - if realFp < strings.Split(resources.Marker, string(os.PathSeparator))[0] { - return ErrSkipDir - } - } else { - if realFp < strings.Split(resources.Marker, string(os.PathSeparator))[0] { - return ErrSkipDir - } - } - } - } + if delimiter == Separator && info.IsDir() { + return filepath.SkipDir } - // If regular file verify - if fl.Mode().IsRegular() { - // If marker is present this will be used to check if filepointer is - // lexically higher than then Marker - if realFp != "" { - if resources.Marker != "" { - if realFp > resources.Marker { - files = append(files, contentInfo{ - Prefix: realFp, - Size: fl.Size(), - Mode: fl.Mode(), - ModTime: fl.ModTime(), - FileInfo: fl, - }) + return nil + }) + close(walkerCh) + }() + go func() { + resp := ListObjectsResp{} + for { + select { + case <-time.After(10 * time.Second): + fmt.Println("worker got timeout") + quit <- true + timeoutReq := ListObjectsReq{bucket, prefix, marker, delimiter, 0} + fmt.Println("after timeout", fs) + fs.timeoutReqCh <- timeoutReq + // FIXME: can there be a race such that sender on reqCh panics? + return + case req := <-reqCh: + resp = ListObjectsResp{} + resp.Objects = make([]ObjectMetadata, 0) + resp.Prefixes = make([]string, 0) + count := 0 + for object := range walkerCh { + if object.Mode.IsDir() { + if delimiter == "" { + // skip directories for recursive list + continue } + resp.Prefixes = append(resp.Prefixes, object.Object) } else { - files = append(files, contentInfo{ - Prefix: realFp, - Size: fl.Size(), - Mode: fl.Mode(), - ModTime: fl.ModTime(), - FileInfo: fl, - }) + resp.Objects = append(resp.Objects, object) + } + resp.NextMarker = object.Object + count++ + if count == req.req.MaxKeys { + resp.IsTruncated = true + break } } + fmt.Println("response objects: ", len(resp.Objects)) + marker = resp.NextMarker + req.respCh <- resp } - // If file is a symlink follow it and populate values. - if fl.Mode()&os.ModeSymlink == os.ModeSymlink { - st, err := os.Stat(fp) - if err != nil { - return nil + } + }() + return reqCh, nil +} + +func (fs *Filesystem) startListService() *probe.Error { + fmt.Println("startListService starting") + listServiceReqCh := make(chan listServiceReq) + timeoutReqCh := make(chan ListObjectsReq) + reqToListWorkerReqCh := make(map[string](chan<- listWorkerReq)) + reqToStr := func(bucket string, prefix string, marker string, delimiter string) string { + return strings.Join([]string{bucket, prefix, marker, delimiter}, ":") + } + go func() { + for { + select { + case timeoutReq := <-timeoutReqCh: + fmt.Println("listservice got timeout on ", timeoutReq) + reqStr := reqToStr(timeoutReq.Bucket, timeoutReq.Prefix, timeoutReq.Marker, timeoutReq.Delimiter) + listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] + if ok { + close(listWorkerReqCh) } - // If marker is present this will be used to check if filepointer is - // lexically higher than then Marker - if realFp != "" { - if resources.Marker != "" { - if realFp > resources.Marker { - files = append(files, contentInfo{ - Prefix: realFp, - Size: st.Size(), - Mode: st.Mode(), - ModTime: st.ModTime(), - FileInfo: st, - }) - } - } else { - files = append(files, contentInfo{ - Prefix: realFp, - Size: st.Size(), - Mode: st.Mode(), - ModTime: st.ModTime(), - FileInfo: st, - }) + delete(reqToListWorkerReqCh, reqStr) + case serviceReq := <-listServiceReqCh: + fmt.Println("serviceReq received", serviceReq) + fmt.Println("sending to listservicereqch", fs) + + reqStr := reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, serviceReq.req.Marker, serviceReq.req.Delimiter) + listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] + if !ok { + var err *probe.Error + listWorkerReqCh, err = fs.listWorker(serviceReq.req) + if err != nil { + fmt.Println("listWorker returned error", err) + serviceReq.respCh <- ListObjectsResp{} + return } + reqToListWorkerReqCh[reqStr] = listWorkerReqCh } + respCh := make(chan ListObjectsResp) + listWorkerReqCh <- listWorkerReq{serviceReq.req, respCh} + resp, ok := <-respCh + if !ok { + serviceReq.respCh <- ListObjectsResp{} + fmt.Println("listWorker resp was not ok") + return + } + delete(reqToListWorkerReqCh, reqStr) + if !resp.IsTruncated { + close(listWorkerReqCh) + } else { + reqStr = reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, resp.NextMarker, serviceReq.req.Delimiter) + reqToListWorkerReqCh[reqStr] = listWorkerReqCh + } + serviceReq.respCh <- resp } - p.files = files - return nil - } - // If no delimiter is specified, crawl through everything. - err := Walk(rootPrefix, getAllFiles) - if err != nil { - if os.IsNotExist(err) { - return nil, resources, probe.NewError(ObjectNotFound{Bucket: bucket, Object: resources.Prefix}) - } - return nil, resources, probe.NewError(err) } - } + }() + fs.timeoutReqCh = timeoutReqCh + fs.listServiceReqCh = listServiceReqCh + return nil +} - var metadataList []ObjectMetadata - var metadata ObjectMetadata +// ListObjects - +func (fs Filesystem) ListObjects(bucket string, req ListObjectsReq) (ListObjectsResp, *probe.Error) { + fs.lock.Lock() + defer fs.lock.Unlock() - // Filter objects - for _, content := range p.files { - if len(metadataList) == resources.Maxkeys { - resources.IsTruncated = true - if resources.IsTruncated && resources.Delimiter != "" { - resources.NextMarker = metadataList[len(metadataList)-1].Object - } - break - } - if content.Prefix > resources.Marker { - var err *probe.Error - metadata, resources, err = fs.filterObjects(bucket, content, resources) - if err != nil { - return nil, resources, err.Trace() - } - // If windows replace all the incoming paths to API compatible paths - if runtime.GOOS == "windows" { - metadata.Object = sanitizeWindowsPath(metadata.Object) - } - if metadata.Bucket != "" { - metadataList = append(metadataList, metadata) - } + Separator := string(os.PathSeparator) + if !IsValidBucketName(bucket) { + return ListObjectsResp{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) + } + + bucket = fs.denormalizeBucket(bucket) + rootPrefix := filepath.Join(fs.path, bucket) + // check bucket exists + if _, e := os.Stat(rootPrefix); e != nil { + if os.IsNotExist(e) { + return ListObjectsResp{}, probe.NewError(BucketNotFound{Bucket: bucket}) } + return ListObjectsResp{}, probe.NewError(e) } - // Sanitize common prefixes back into API compatible paths - if runtime.GOOS == "windows" { - resources.CommonPrefixes = sanitizeWindowsPaths(resources.CommonPrefixes...) + + canonicalize := func(str string) string { + return strings.Replace(str, "/", string(os.PathSeparator), -1) + } + decanonicalize := func(str string) string { + return strings.Replace(str, string(os.PathSeparator), "/", -1) } - return metadataList, resources, nil -} -func (fs Filesystem) filterObjects(bucket string, content contentInfo, resources BucketResourcesMetadata) (ObjectMetadata, BucketResourcesMetadata, *probe.Error) { - var err *probe.Error - var metadata ObjectMetadata + req.Bucket = bucket + req.Prefix = canonicalize(req.Prefix) + req.Marker = canonicalize(req.Marker) + req.Delimiter = canonicalize(req.Delimiter) - name := content.Prefix - switch true { - // Both delimiter and Prefix is present - case resources.Delimiter != "" && resources.Prefix != "": - if strings.HasPrefix(name, resources.Prefix) { - trimmedName := strings.TrimPrefix(name, resources.Prefix) - delimitedName := delimiter(trimmedName, resources.Delimiter) - switch true { - case name == resources.Prefix: - // Use resources.Prefix to filter out delimited file - metadata, err = getMetadata(fs.path, bucket, name) - if err != nil { - return ObjectMetadata{}, resources, err.Trace() - } - if metadata.Mode.IsDir() { - resources.CommonPrefixes = append(resources.CommonPrefixes, name+resources.Delimiter) - return ObjectMetadata{}, resources, nil - } - case delimitedName == content.FileInfo.Name(): - // Use resources.Prefix to filter out delimited files - metadata, err = getMetadata(fs.path, bucket, name) - if err != nil { - return ObjectMetadata{}, resources, err.Trace() - } - if metadata.Mode.IsDir() { - resources.CommonPrefixes = append(resources.CommonPrefixes, name+resources.Delimiter) - return ObjectMetadata{}, resources, nil - } - case delimitedName != "": - resources.CommonPrefixes = append(resources.CommonPrefixes, resources.Prefix+delimitedName) - } - } - // Delimiter present and Prefix is absent - case resources.Delimiter != "" && resources.Prefix == "": - delimitedName := delimiter(name, resources.Delimiter) - switch true { - case delimitedName == "": - metadata, err = getMetadata(fs.path, bucket, name) - if err != nil { - return ObjectMetadata{}, resources, err.Trace() - } - if metadata.Mode.IsDir() { - resources.CommonPrefixes = append(resources.CommonPrefixes, name+resources.Delimiter) - return ObjectMetadata{}, resources, nil - } - case delimitedName == content.FileInfo.Name(): - metadata, err = getMetadata(fs.path, bucket, name) - if err != nil { - return ObjectMetadata{}, resources, err.Trace() - } - if metadata.Mode.IsDir() { - resources.CommonPrefixes = append(resources.CommonPrefixes, name+resources.Delimiter) - return ObjectMetadata{}, resources, nil - } - case delimitedName != "": - resources.CommonPrefixes = append(resources.CommonPrefixes, delimitedName) - } - // Delimiter is absent and only Prefix is present - case resources.Delimiter == "" && resources.Prefix != "": - if strings.HasPrefix(name, resources.Prefix) { - // Do not strip prefix object output - metadata, err = getMetadata(fs.path, bucket, name) - if err != nil { - return ObjectMetadata{}, resources, err.Trace() - } - } - default: - metadata, err = getMetadata(fs.path, bucket, name) - if err != nil { - return ObjectMetadata{}, resources, err.Trace() - } + if req.Delimiter != "" && req.Delimiter != Separator { + return ListObjectsResp{}, probe.NewError(errors.New("not supported")) + } + + respCh := make(chan ListObjectsResp) + fs.listServiceReqCh <- listServiceReq{req, respCh} + resp := <-respCh + + for i := 0; i < len(resp.Prefixes); i++ { + resp.Prefixes[i] = decanonicalize(resp.Prefixes[i]) + } + for i := 0; i < len(resp.Objects); i++ { + resp.Objects[i].Object = decanonicalize(resp.Objects[i].Object) + } + if req.Delimiter == "" { + // unset NextMaker for recursive list + resp.NextMarker = "" } - sortUnique(sort.StringSlice(resources.CommonPrefixes)) - return metadata, resources, nil + return resp, nil } diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 76762b672..8f6f7571c 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -27,12 +27,14 @@ import ( // Filesystem - local variables type Filesystem struct { - path string - minFreeDisk int64 - maxBuckets int - lock *sync.Mutex - multiparts *Multiparts - buckets *Buckets + path string + minFreeDisk int64 + maxBuckets int + lock *sync.Mutex + multiparts *Multiparts + buckets *Buckets + listServiceReqCh chan<- listServiceReq + timeoutReqCh chan<- ListObjectsReq } // Buckets holds acl information @@ -92,11 +94,10 @@ func New(rootPath string) (Filesystem, *probe.Error) { return Filesystem{}, err.Trace() } } - fs := Filesystem{lock: new(sync.Mutex)} - fs.path = rootPath - fs.multiparts = multiparts - fs.buckets = buckets - + a := Filesystem{lock: new(sync.Mutex)} + a.path = rootPath + a.multiparts = multiparts + a.buckets = buckets /// Defaults // maximum buckets to be listed from list buckets. @@ -104,6 +105,10 @@ func New(rootPath string) (Filesystem, *probe.Error) { // minium free disk required for i/o operations to succeed. fs.minFreeDisk = 10 + err = fs.startListService() + if err != nil { + return Filesystem{}, err.Trace(rootPath) + } // Return here. return fs, nil } From 682020ef2fbc0ee1a1126f238d7a61b1f2c2b262 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 19 Jan 2016 17:49:48 -0800 Subject: [PATCH 2/6] listObjects: Channel based changes. Supports: - prefixes - marker --- api-errors.go | 1 - api-resources.go | 12 +-- api-response.go | 11 +-- bucket-handlers.go | 21 ++--- pkg/fs/api_suite_nix_test.go | 121 +++++++++--------------- pkg/fs/api_suite_windows_test.go | 123 +++++++++---------------- pkg/fs/definitions.go | 37 +++----- pkg/fs/fs-bucket-listobjects.go | 152 +++++++++++++++---------------- pkg/fs/fs.go | 11 ++- server-config.go | 18 ---- server_fs_test.go | 7 +- update-main.go | 6 -- 12 files changed, 204 insertions(+), 316 deletions(-) diff --git a/api-errors.go b/api-errors.go index b1d5b76b2..cd1d1c8c7 100644 --- a/api-errors.go +++ b/api-errors.go @@ -53,7 +53,6 @@ const ( InvalidBucketName InvalidDigest InvalidRange - InvalidRequest InvalidMaxKeys InvalidMaxUploads InvalidMaxParts diff --git a/api-resources.go b/api-resources.go index e76963b91..1a1ed4d39 100644 --- a/api-resources.go +++ b/api-resources.go @@ -24,12 +24,12 @@ import ( ) // parse bucket url queries -func getBucketResources(values url.Values) (v fs.BucketResourcesMetadata) { - v.Prefix = values.Get("prefix") - v.Marker = values.Get("marker") - v.Maxkeys, _ = strconv.Atoi(values.Get("max-keys")) - v.Delimiter = values.Get("delimiter") - v.EncodingType = values.Get("encoding-type") +func getBucketResources(values url.Values) (prefix, marker, delimiter string, maxkeys int, encodingType string) { + prefix = values.Get("prefix") + marker = values.Get("marker") + delimiter = values.Get("delimiter") + maxkeys, _ = strconv.Atoi(values.Get("max-keys")) + encodingType = values.Get("encoding-type") return } diff --git a/api-response.go b/api-response.go index c3dfb0bfd..f29013754 100644 --- a/api-response.go +++ b/api-response.go @@ -92,8 +92,7 @@ func generateAccessControlPolicyResponse(acl fs.BucketACL) AccessControlPolicyRe } // generates an ListObjects response for the said bucket with other enumerated options. -// func generateListObjectsResponse(bucket string, objects []fs.ObjectMetadata, bucketResources fs.BucketResourcesMetadata) ListObjectsResponse { -func generateListObjectsResponse(bucket string, req fs.ListObjectsReq, resp fs.ListObjectsResp) ListObjectsResponse { +func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKeys int, resp fs.ListObjectsResult) ListObjectsResponse { var contents []*Object var prefixes []*CommonPrefix var owner = Owner{} @@ -119,10 +118,10 @@ func generateListObjectsResponse(bucket string, req fs.ListObjectsReq, resp fs.L data.Name = bucket data.Contents = contents - data.MaxKeys = req.MaxKeys - data.Prefix = req.Prefix - data.Delimiter = req.Delimiter - data.Marker = req.Marker + data.Prefix = prefix + data.Marker = marker + data.Delimiter = delimiter + data.MaxKeys = maxKeys data.NextMarker = resp.NextMarker data.IsTruncated = resp.IsTruncated diff --git a/bucket-handlers.go b/bucket-handlers.go index 146143d70..d413f7851 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -52,6 +52,7 @@ func (api CloudStorageAPI) GetBucketLocationHandler(w http.ResponseWriter, req * default: writeErrorResponse(w, req, InternalError, req.URL.Path) } + return } // TODO: Location value for LocationResponse is deliberately not used, until @@ -128,25 +129,21 @@ func (api CloudStorageAPI) ListObjectsHandler(w http.ResponseWriter, req *http.R } } } - resources := getBucketResources(req.URL.Query()) - if resources.Maxkeys < 0 { + + // TODO handle encoding type. + prefix, marker, delimiter, maxkeys, _ := getBucketResources(req.URL.Query()) + if maxkeys < 0 { writeErrorResponse(w, req, InvalidMaxKeys, req.URL.Path) return } - if resources.Maxkeys == 0 { - resources.Maxkeys = maxObjectList + if maxkeys == 0 { + maxkeys = maxObjectList } - listReq := fs.ListObjectsReq{ - Prefix: resources.Prefix, - Marker: resources.Marker, - Delimiter: resources.Delimiter, - MaxKeys: resources.Maxkeys, - } - listResp, err := api.Filesystem.ListObjects(bucket, listReq) + listResp, err := api.Filesystem.ListObjects(bucket, prefix, marker, delimiter, maxkeys) if err == nil { // generate response - response := generateListObjectsResponse(bucket, listReq, listResp) + response := generateListObjectsResponse(bucket, prefix, marker, delimiter, maxkeys, listResp) encodedSuccessResponse := encodeSuccessResponse(response) // Write headers setCommonHeaders(w) diff --git a/pkg/fs/api_suite_nix_test.go b/pkg/fs/api_suite_nix_test.go index c3b50281c..0cc108c47 100644 --- a/pkg/fs/api_suite_nix_test.go +++ b/pkg/fs/api_suite_nix_test.go @@ -165,59 +165,50 @@ func testMultipleObjectCreation(c *check.C, create func() Filesystem) { func testPaging(c *check.C, create func() Filesystem) { fs := create() fs.MakeBucket("bucket", "") - resources := BucketResourcesMetadata{} - objects, resources, err := fs.ListObjects("bucket", resources) + result, err := fs.ListObjects("bucket", "", "", "", 0) c.Assert(err, check.IsNil) - c.Assert(len(objects), check.Equals, 0) - c.Assert(resources.IsTruncated, check.Equals, false) + c.Assert(len(result.Objects), check.Equals, 0) + c.Assert(result.IsTruncated, check.Equals, false) // check before paging occurs for i := 0; i < 5; i++ { key := "obj" + strconv.Itoa(i) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) c.Assert(err, check.IsNil) - resources.Maxkeys = 5 - resources.Prefix = "" - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "", "", "", 5) c.Assert(err, check.IsNil) - c.Assert(len(objects), check.Equals, i+1) - c.Assert(resources.IsTruncated, check.Equals, false) + c.Assert(len(result.Objects), check.Equals, i+1) + c.Assert(result.IsTruncated, check.Equals, false) } // check after paging occurs pages work for i := 6; i <= 10; i++ { key := "obj" + strconv.Itoa(i) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) c.Assert(err, check.IsNil) - resources.Maxkeys = 5 - resources.Prefix = "" - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "", "", "", 5) c.Assert(err, check.IsNil) - c.Assert(len(objects), check.Equals, 5) - c.Assert(resources.IsTruncated, check.Equals, true) + c.Assert(len(result.Objects), check.Equals, 5) + c.Assert(result.IsTruncated, check.Equals, true) } // check paging with prefix at end returns less objects { _, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil) c.Assert(err, check.IsNil) - fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) + _, err = fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) c.Assert(err, check.IsNil) - resources.Prefix = "new" - resources.Maxkeys = 5 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "new", "", "", 5) c.Assert(err, check.IsNil) - c.Assert(len(objects), check.Equals, 2) + c.Assert(len(result.Objects), check.Equals, 2) } // check ordering of pages { - resources.Prefix = "" - resources.Maxkeys = 1000 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "", "", "", 1000) c.Assert(err, check.IsNil) - c.Assert(objects[0].Object, check.Equals, "newPrefix") - c.Assert(objects[1].Object, check.Equals, "newPrefix2") - c.Assert(objects[2].Object, check.Equals, "obj0") - c.Assert(objects[3].Object, check.Equals, "obj1") - c.Assert(objects[4].Object, check.Equals, "obj10") + c.Assert(result.Objects[0].Object, check.Equals, "newPrefix") + c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2") + c.Assert(result.Objects[2].Object, check.Equals, "obj0") + c.Assert(result.Objects[3].Object, check.Equals, "obj1") + c.Assert(result.Objects[4].Object, check.Equals, "obj10") } // check delimited results with delimiter and prefix @@ -226,72 +217,49 @@ func testPaging(c *check.C, create func() Filesystem) { c.Assert(err, check.IsNil) _, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) c.Assert(err, check.IsNil) - var prefixes []string - resources.CommonPrefixes = prefixes // allocate new everytime - resources.Delimiter = "/" - resources.Prefix = "this/is/" - resources.Maxkeys = 10 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "this/is/", "", "/", 10) c.Assert(err, check.IsNil) - c.Assert(len(objects), check.Equals, 1) - c.Assert(resources.CommonPrefixes[0], check.Equals, "this/is/also/") + c.Assert(len(result.Objects), check.Equals, 1) + c.Assert(result.Prefixes[0], check.Equals, "this/is/also/") } time.Sleep(time.Second) // check delimited results with delimiter without prefix { - var prefixes []string - resources.CommonPrefixes = prefixes // allocate new everytime - resources.Delimiter = "/" - resources.Prefix = "" - resources.Maxkeys = 1000 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "", "", "/", 1000) c.Assert(err, check.IsNil) - c.Assert(objects[0].Object, check.Equals, "newPrefix") - c.Assert(objects[1].Object, check.Equals, "newPrefix2") - c.Assert(objects[2].Object, check.Equals, "obj0") - c.Assert(objects[3].Object, check.Equals, "obj1") - c.Assert(objects[4].Object, check.Equals, "obj10") - c.Assert(resources.CommonPrefixes[0], check.Equals, "this/") + c.Assert(result.Objects[0].Object, check.Equals, "newPrefix") + c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2") + c.Assert(result.Objects[2].Object, check.Equals, "obj0") + c.Assert(result.Objects[3].Object, check.Equals, "obj1") + c.Assert(result.Objects[4].Object, check.Equals, "obj10") + c.Assert(result.Prefixes[0], check.Equals, "this/") } // check results with Marker { - var prefixes []string - resources.CommonPrefixes = prefixes // allocate new everytime - resources.Prefix = "" - resources.Marker = "newPrefix" - resources.Delimiter = "" - resources.Maxkeys = 3 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "", "newPrefix", "", 3) c.Assert(err, check.IsNil) - c.Assert(objects[0].Object, check.Equals, "newPrefix2") - c.Assert(objects[1].Object, check.Equals, "obj0") - c.Assert(objects[2].Object, check.Equals, "obj1") + c.Assert(result.Objects[0].Object, check.Equals, "newPrefix2") + c.Assert(result.Objects[1].Object, check.Equals, "obj0") + c.Assert(result.Objects[2].Object, check.Equals, "obj1") } // check ordering of results with prefix { - resources.Prefix = "obj" - resources.Delimiter = "" - resources.Marker = "" - resources.Maxkeys = 1000 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "obj", "", "", 1000) c.Assert(err, check.IsNil) - c.Assert(objects[0].Object, check.Equals, "obj0") - c.Assert(objects[1].Object, check.Equals, "obj1") - c.Assert(objects[2].Object, check.Equals, "obj10") - c.Assert(objects[3].Object, check.Equals, "obj2") - c.Assert(objects[4].Object, check.Equals, "obj3") + c.Assert(result.Objects[0].Object, check.Equals, "obj0") + c.Assert(result.Objects[1].Object, check.Equals, "obj1") + c.Assert(result.Objects[2].Object, check.Equals, "obj10") + c.Assert(result.Objects[3].Object, check.Equals, "obj2") + c.Assert(result.Objects[4].Object, check.Equals, "obj3") } // check ordering of results with prefix and no paging { - resources.Prefix = "new" - resources.Marker = "" - resources.Maxkeys = 5 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "new", "", "", 5) c.Assert(err, check.IsNil) - c.Assert(objects[0].Object, check.Equals, "newPrefix") - c.Assert(objects[1].Object, check.Equals, "newPrefix2") + c.Assert(result.Objects[0].Object, check.Equals, "newPrefix") + c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2") } } @@ -417,11 +385,10 @@ func testListBucketsOrder(c *check.C, create func() Filesystem) { func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) { fs := create() - resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000} - objects, resources, err := fs.ListObjects("bucket", resources) + result, err := fs.ListObjects("bucket", "", "", "", 1000) c.Assert(err, check.Not(check.IsNil)) - c.Assert(resources.IsTruncated, check.Equals, false) - c.Assert(len(objects), check.Equals, 0) + c.Assert(result.IsTruncated, check.Equals, false) + c.Assert(len(result.Objects), check.Equals, 0) } func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) { diff --git a/pkg/fs/api_suite_windows_test.go b/pkg/fs/api_suite_windows_test.go index 22ebc52dc..be83e07b7 100644 --- a/pkg/fs/api_suite_windows_test.go +++ b/pkg/fs/api_suite_windows_test.go @@ -26,7 +26,6 @@ import ( "encoding/xml" "math/rand" "strconv" - "time" "gopkg.in/check.v1" ) @@ -165,59 +164,50 @@ func testMultipleObjectCreation(c *check.C, create func() Filesystem) { func testPaging(c *check.C, create func() Filesystem) { fs := create() fs.MakeBucket("bucket", "") - resources := BucketResourcesMetadata{} - objects, resources, err := fs.ListObjects("bucket", resources) + result, err := fs.ListObjects("bucket", "", "", "", 0) c.Assert(err, check.IsNil) - c.Assert(len(objects), check.Equals, 0) - c.Assert(resources.IsTruncated, check.Equals, false) + c.Assert(len(result.Objects), check.Equals, 0) + c.Assert(result.IsTruncated, check.Equals, false) // check before paging occurs for i := 0; i < 5; i++ { key := "obj" + strconv.Itoa(i) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) c.Assert(err, check.IsNil) - resources.Maxkeys = 5 - resources.Prefix = "" - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "", "", "", 5) c.Assert(err, check.IsNil) - c.Assert(len(objects), check.Equals, i+1) - c.Assert(resources.IsTruncated, check.Equals, false) + c.Assert(len(result.Objects), check.Equals, i+1) + c.Assert(result.IsTruncated, check.Equals, false) } // check after paging occurs pages work for i := 6; i <= 10; i++ { key := "obj" + strconv.Itoa(i) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) c.Assert(err, check.IsNil) - resources.Maxkeys = 5 - resources.Prefix = "" - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "", "", "", 5) c.Assert(err, check.IsNil) - c.Assert(len(objects), check.Equals, 5) - c.Assert(resources.IsTruncated, check.Equals, true) + c.Assert(len(result.Objects), check.Equals, 5) + c.Assert(result.IsTruncated, check.Equals, true) } // check paging with prefix at end returns less objects { _, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil) c.Assert(err, check.IsNil) - _, err = fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) + fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) c.Assert(err, check.IsNil) - resources.Prefix = "new" - resources.Maxkeys = 5 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "new", "", "", 5) c.Assert(err, check.IsNil) - c.Assert(len(objects), check.Equals, 2) + c.Assert(len(result.Objects), check.Equals, 2) } // check ordering of pages { - resources.Prefix = "" - resources.Maxkeys = 1000 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "", "", "", 1000) c.Assert(err, check.IsNil) - c.Assert(objects[0].Object, check.Equals, "newPrefix") - c.Assert(objects[1].Object, check.Equals, "newPrefix2") - c.Assert(objects[2].Object, check.Equals, "obj0") - c.Assert(objects[3].Object, check.Equals, "obj1") - c.Assert(objects[4].Object, check.Equals, "obj10") + c.Assert(result.Objects[0].Object, check.Equals, "newPrefix") + c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2") + c.Assert(result.Objects[2].Object, check.Equals, "obj0") + c.Assert(result.Objects[3].Object, check.Equals, "obj1") + c.Assert(result.Objects[4].Object, check.Equals, "obj10") } // check delimited results with delimiter and prefix @@ -226,72 +216,48 @@ func testPaging(c *check.C, create func() Filesystem) { c.Assert(err, check.IsNil) _, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) c.Assert(err, check.IsNil) - var prefixes []string - resources.CommonPrefixes = prefixes // allocate new everytime - resources.Delimiter = "/" - resources.Prefix = "this/is/" - resources.Maxkeys = 10 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "this/is/", "", "/", 10) c.Assert(err, check.IsNil) - c.Assert(len(objects), check.Equals, 1) - c.Assert(resources.CommonPrefixes[0], check.Equals, "this/is/also/") + c.Assert(len(result.Objects), check.Equals, 1) + c.Assert(result.Prefixes[0], check.Equals, "this/is/also/") } - time.Sleep(time.Second) // check delimited results with delimiter without prefix { - var prefixes []string - resources.CommonPrefixes = prefixes // allocate new everytime - resources.Delimiter = "/" - resources.Prefix = "" - resources.Maxkeys = 1000 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "", "", "/", 1000) c.Assert(err, check.IsNil) - c.Assert(objects[0].Object, check.Equals, "newPrefix") - c.Assert(objects[1].Object, check.Equals, "newPrefix2") - c.Assert(objects[2].Object, check.Equals, "obj0") - c.Assert(objects[3].Object, check.Equals, "obj1") - c.Assert(objects[4].Object, check.Equals, "obj10") - c.Assert(resources.CommonPrefixes[0], check.Equals, "this/") + c.Assert(result.Objects[0].Object, check.Equals, "newPrefix") + c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2") + c.Assert(result.Objects[2].Object, check.Equals, "obj0") + c.Assert(result.Objects[3].Object, check.Equals, "obj1") + c.Assert(result.Objects[4].Object, check.Equals, "obj10") + c.Assert(result.Prefixes[0], check.Equals, "this/") } // check results with Marker { - var prefixes []string - resources.CommonPrefixes = prefixes // allocate new everytime - resources.Prefix = "" - resources.Marker = "newPrefix" - resources.Delimiter = "" - resources.Maxkeys = 3 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "", "newPrefix", "", 3) c.Assert(err, check.IsNil) - c.Assert(objects[0].Object, check.Equals, "newPrefix2") - c.Assert(objects[1].Object, check.Equals, "obj0") - c.Assert(objects[2].Object, check.Equals, "obj1") + c.Assert(result.Objects[0].Object, check.Equals, "newPrefix2") + c.Assert(result.Objects[1].Object, check.Equals, "obj0") + c.Assert(result.Objects[2].Object, check.Equals, "obj1") } // check ordering of results with prefix { - resources.Prefix = "obj" - resources.Delimiter = "" - resources.Marker = "" - resources.Maxkeys = 1000 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "obj", "", "", 1000) c.Assert(err, check.IsNil) - c.Assert(objects[0].Object, check.Equals, "obj0") - c.Assert(objects[1].Object, check.Equals, "obj1") - c.Assert(objects[2].Object, check.Equals, "obj10") - c.Assert(objects[3].Object, check.Equals, "obj2") - c.Assert(objects[4].Object, check.Equals, "obj3") + c.Assert(result.Objects[0].Object, check.Equals, "obj0") + c.Assert(result.Objects[1].Object, check.Equals, "obj1") + c.Assert(result.Objects[2].Object, check.Equals, "obj10") + c.Assert(result.Objects[3].Object, check.Equals, "obj2") + c.Assert(result.Objects[4].Object, check.Equals, "obj3") } // check ordering of results with prefix and no paging { - resources.Prefix = "new" - resources.Marker = "" - resources.Maxkeys = 5 - objects, resources, err = fs.ListObjects("bucket", resources) + result, err = fs.ListObjects("bucket", "new", "", "", 5) c.Assert(err, check.IsNil) - c.Assert(objects[0].Object, check.Equals, "newPrefix") - c.Assert(objects[1].Object, check.Equals, "newPrefix2") + c.Assert(result.Objects[0].Object, check.Equals, "newPrefix") + c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2") } } @@ -416,11 +382,10 @@ func testListBucketsOrder(c *check.C, create func() Filesystem) { func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) { fs := create() - resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000} - objects, resources, err := fs.ListObjects("bucket", resources) + result, err := fs.ListObjects("bucket", "", "", "", 1000) c.Assert(err, check.Not(check.IsNil)) - c.Assert(resources.IsTruncated, check.Equals, false) - c.Assert(len(objects), check.Equals, 0) + c.Assert(result.IsTruncated, check.Equals, false) + c.Assert(len(result.Objects), check.Equals, 0) } func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) { diff --git a/pkg/fs/definitions.go b/pkg/fs/definitions.go index 6c656167f..93924cdc5 100644 --- a/pkg/fs/definitions.go +++ b/pkg/fs/definitions.go @@ -118,19 +118,15 @@ type BucketMultipartResourcesMetadata struct { CommonPrefixes []string } -// BucketResourcesMetadata - various types of bucket resources -type BucketResourcesMetadata struct { - Prefix string - Marker string - NextMarker string - Maxkeys int - EncodingType string - Delimiter string - IsTruncated bool - CommonPrefixes []string -} - -type ListObjectsReq struct { +// ListObjectsResult - container for list object request results. +type ListObjectsResult struct { + IsTruncated bool + NextMarker string + Objects []ObjectMetadata + Prefixes []string +} + +type listObjectsReq struct { Bucket string Prefix string Marker string @@ -138,21 +134,14 @@ type ListObjectsReq struct { MaxKeys int } -type ListObjectsResp struct { - IsTruncated bool - NextMarker string - Objects []ObjectMetadata - Prefixes []string -} - type listServiceReq struct { - req ListObjectsReq - respCh chan ListObjectsResp + req listObjectsReq + respCh chan ListObjectsResult } type listWorkerReq struct { - req ListObjectsReq - respCh chan ListObjectsResp + req listObjectsReq + respCh chan ListObjectsResult } // CompletePart - completed part container diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index 041067727..54a5cd01d 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2015 Minio, Inc. + * Minio Cloud Storage, (C) 2015-2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ package fs import ( "errors" - "fmt" "os" "path/filepath" "strings" @@ -27,74 +26,88 @@ import ( "github.com/minio/minio-xl/pkg/probe" ) -func (fs Filesystem) listWorker(startReq ListObjectsReq) (chan<- listWorkerReq, *probe.Error) { - Separator := string(os.PathSeparator) +func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, *probe.Error) { bucket := startReq.Bucket prefix := startReq.Prefix marker := startReq.Marker delimiter := startReq.Delimiter - quit := make(chan bool) - if marker != "" { - return nil, probe.NewError(errors.New("Not supported")) - } - if delimiter != "" && delimiter != Separator { - return nil, probe.NewError(errors.New("Not supported")) - } + quitWalker := make(chan bool) reqCh := make(chan listWorkerReq) walkerCh := make(chan ObjectMetadata) go func() { - rootPath := filepath.Join(fs.path, bucket, prefix) - stripPath := filepath.Join(fs.path, bucket) + Separator + var rootPath string + bucketPath := filepath.Join(fs.path, bucket) + trimBucketPathPrefix := bucketPath + string(os.PathSeparator) + prefixPath := trimBucketPathPrefix + prefix + st, err := os.Stat(prefixPath) + if err != nil && os.IsNotExist(err) { + rootPath = bucketPath + } else { + if st.IsDir() && !strings.HasSuffix(prefix, delimiter) { + rootPath = bucketPath + } else { + rootPath = prefixPath + } + } filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { if path == rootPath { return nil } if info.IsDir() { - path = path + Separator - } - objectName := strings.TrimPrefix(path, stripPath) - object := ObjectMetadata{ - Object: objectName, - Created: info.ModTime(), - Mode: info.Mode(), - Size: info.Size(), - } - select { - case walkerCh <- object: - // do nothings - case <-quit: - fmt.Println("walker got quit") - // returning error ends the Walk() - return errors.New("Ending") + path = path + string(os.PathSeparator) } - if delimiter == Separator && info.IsDir() { - return filepath.SkipDir + objectName := strings.TrimPrefix(path, trimBucketPathPrefix) + if strings.HasPrefix(objectName, prefix) { + if marker >= objectName { + return nil + } + object := ObjectMetadata{ + Object: objectName, + Created: info.ModTime(), + Mode: info.Mode(), + Size: info.Size(), + } + select { + case walkerCh <- object: + // Do nothing + case <-quitWalker: + // Returning error ends the Walk() + return errors.New("Ending") + } + if delimiter != "" && info.IsDir() { + return filepath.SkipDir + } } return nil }) close(walkerCh) }() go func() { - resp := ListObjectsResp{} + resp := ListObjectsResult{} for { select { case <-time.After(10 * time.Second): - fmt.Println("worker got timeout") - quit <- true - timeoutReq := ListObjectsReq{bucket, prefix, marker, delimiter, 0} - fmt.Println("after timeout", fs) + quitWalker <- true + timeoutReq := listObjectsReq{bucket, prefix, marker, delimiter, 0} fs.timeoutReqCh <- timeoutReq // FIXME: can there be a race such that sender on reqCh panics? return - case req := <-reqCh: - resp = ListObjectsResp{} + case req, ok := <-reqCh: + if !ok { + return + } + resp = ListObjectsResult{} resp.Objects = make([]ObjectMetadata, 0) resp.Prefixes = make([]string, 0) count := 0 for object := range walkerCh { + if count == req.req.MaxKeys { + resp.IsTruncated = true + break + } if object.Mode.IsDir() { if delimiter == "" { - // skip directories for recursive list + // Skip directories for recursive list continue } resp.Prefixes = append(resp.Prefixes, object.Object) @@ -103,13 +116,7 @@ func (fs Filesystem) listWorker(startReq ListObjectsReq) (chan<- listWorkerReq, } resp.NextMarker = object.Object count++ - if count == req.req.MaxKeys { - resp.IsTruncated = true - break - } } - fmt.Println("response objects: ", len(resp.Objects)) - marker = resp.NextMarker req.respCh <- resp } } @@ -118,9 +125,8 @@ func (fs Filesystem) listWorker(startReq ListObjectsReq) (chan<- listWorkerReq, } func (fs *Filesystem) startListService() *probe.Error { - fmt.Println("startListService starting") listServiceReqCh := make(chan listServiceReq) - timeoutReqCh := make(chan ListObjectsReq) + timeoutReqCh := make(chan listObjectsReq) reqToListWorkerReqCh := make(map[string](chan<- listWorkerReq)) reqToStr := func(bucket string, prefix string, marker string, delimiter string) string { return strings.Join([]string{bucket, prefix, marker, delimiter}, ":") @@ -129,7 +135,6 @@ func (fs *Filesystem) startListService() *probe.Error { for { select { case timeoutReq := <-timeoutReqCh: - fmt.Println("listservice got timeout on ", timeoutReq) reqStr := reqToStr(timeoutReq.Bucket, timeoutReq.Prefix, timeoutReq.Marker, timeoutReq.Delimiter) listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] if ok { @@ -137,27 +142,22 @@ func (fs *Filesystem) startListService() *probe.Error { } delete(reqToListWorkerReqCh, reqStr) case serviceReq := <-listServiceReqCh: - fmt.Println("serviceReq received", serviceReq) - fmt.Println("sending to listservicereqch", fs) - reqStr := reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, serviceReq.req.Marker, serviceReq.req.Delimiter) listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] if !ok { var err *probe.Error listWorkerReqCh, err = fs.listWorker(serviceReq.req) if err != nil { - fmt.Println("listWorker returned error", err) - serviceReq.respCh <- ListObjectsResp{} + serviceReq.respCh <- ListObjectsResult{} return } reqToListWorkerReqCh[reqStr] = listWorkerReqCh } - respCh := make(chan ListObjectsResp) + respCh := make(chan ListObjectsResult) listWorkerReqCh <- listWorkerReq{serviceReq.req, respCh} resp, ok := <-respCh if !ok { - serviceReq.respCh <- ListObjectsResp{} - fmt.Println("listWorker resp was not ok") + serviceReq.respCh <- ListObjectsResult{} return } delete(reqToListWorkerReqCh, reqStr) @@ -177,13 +177,12 @@ func (fs *Filesystem) startListService() *probe.Error { } // ListObjects - -func (fs Filesystem) ListObjects(bucket string, req ListObjectsReq) (ListObjectsResp, *probe.Error) { +func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() - Separator := string(os.PathSeparator) if !IsValidBucketName(bucket) { - return ListObjectsResp{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) + return ListObjectsResult{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } bucket = fs.denormalizeBucket(bucket) @@ -191,39 +190,34 @@ func (fs Filesystem) ListObjects(bucket string, req ListObjectsReq) (ListObjects // check bucket exists if _, e := os.Stat(rootPrefix); e != nil { if os.IsNotExist(e) { - return ListObjectsResp{}, probe.NewError(BucketNotFound{Bucket: bucket}) + return ListObjectsResult{}, probe.NewError(BucketNotFound{Bucket: bucket}) } - return ListObjectsResp{}, probe.NewError(e) - } - - canonicalize := func(str string) string { - return strings.Replace(str, "/", string(os.PathSeparator), -1) - } - decanonicalize := func(str string) string { - return strings.Replace(str, string(os.PathSeparator), "/", -1) + return ListObjectsResult{}, probe.NewError(e) } + req := listObjectsReq{} req.Bucket = bucket - req.Prefix = canonicalize(req.Prefix) - req.Marker = canonicalize(req.Marker) - req.Delimiter = canonicalize(req.Delimiter) - - if req.Delimiter != "" && req.Delimiter != Separator { - return ListObjectsResp{}, probe.NewError(errors.New("not supported")) - } + req.Prefix = filepath.FromSlash(prefix) + req.Marker = filepath.FromSlash(marker) + req.Delimiter = filepath.FromSlash(delimiter) + req.MaxKeys = maxKeys - respCh := make(chan ListObjectsResp) + respCh := make(chan ListObjectsResult) fs.listServiceReqCh <- listServiceReq{req, respCh} resp := <-respCh for i := 0; i < len(resp.Prefixes); i++ { - resp.Prefixes[i] = decanonicalize(resp.Prefixes[i]) + resp.Prefixes[i] = filepath.ToSlash(resp.Prefixes[i]) } for i := 0; i < len(resp.Objects); i++ { - resp.Objects[i].Object = decanonicalize(resp.Objects[i].Object) + resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object) } if req.Delimiter == "" { - // unset NextMaker for recursive list + // This element is set only if you have delimiter set. + // If response does not include the NextMaker and it is + // truncated, you can use the value of the last Key in the + // response as the marker in the subsequent request to get the + // next set of object keys. resp.NextMarker = "" } return resp, nil diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 8f6f7571c..39f6a44a1 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -34,7 +34,7 @@ type Filesystem struct { multiparts *Multiparts buckets *Buckets listServiceReqCh chan<- listServiceReq - timeoutReqCh chan<- ListObjectsReq + timeoutReqCh chan<- listObjectsReq } // Buckets holds acl information @@ -94,10 +94,10 @@ func New(rootPath string) (Filesystem, *probe.Error) { return Filesystem{}, err.Trace() } } - a := Filesystem{lock: new(sync.Mutex)} - a.path = rootPath - a.multiparts = multiparts - a.buckets = buckets + fs := Filesystem{lock: new(sync.Mutex)} + fs.path = rootPath + fs.multiparts = multiparts + fs.buckets = buckets /// Defaults // maximum buckets to be listed from list buckets. @@ -105,6 +105,7 @@ func New(rootPath string) (Filesystem, *probe.Error) { // minium free disk required for i/o operations to succeed. fs.minFreeDisk = 10 + // Start list goroutine. err = fs.startListService() if err != nil { return Filesystem{}, err.Trace(rootPath) diff --git a/server-config.go b/server-config.go index c6c1c3368..b6dab99b7 100644 --- a/server-config.go +++ b/server-config.go @@ -159,24 +159,6 @@ func createConfigPath() *probe.Error { return nil } -// isAuthConfigFileExists is auth config file exists? -func isConfigFileExists() bool { - if _, err := os.Stat(mustGetConfigFile()); err != nil { - if os.IsNotExist(err) { - return false - } - panic(err) - } - return true -} - -// mustGetConfigFile always get users config file, if not panic -func mustGetConfigFile() string { - configFile, err := getConfigFile() - fatalIf(err.Trace(), "Unable to get config file.", nil) - return configFile -} - // getConfigFile get users config file func getConfigFile() (string, *probe.Error) { configPath, err := getConfigPath() diff --git a/server_fs_test.go b/server_fs_test.go index fb500d870..d833515f0 100644 --- a/server_fs_test.go +++ b/server_fs_test.go @@ -126,16 +126,17 @@ var ignoredHeaders = map[string]bool{ } func (s *MyAPIFSCacheSuite) newRequest(method, urlStr string, contentLength int64, body io.ReadSeeker) (*http.Request, error) { + if method == "" { + method = "POST" + } t := time.Now().UTC() + req, err := http.NewRequest(method, urlStr, nil) if err != nil { return nil, err } req.Header.Set("x-amz-date", t.Format(iso8601Format)) - if method == "" { - method = "POST" - } // add Content-Length req.ContentLength = contentLength diff --git a/update-main.go b/update-main.go index a4817f646..4fc972d96 100644 --- a/update-main.go +++ b/update-main.go @@ -74,12 +74,6 @@ const ( minioUpdateExperimentalURL = "https://dl.minio.io/server/minio/experimental/" ) -// minioUpdates container to hold updates json. -type minioUpdates struct { - BuildDate string - Platforms map[string]string -} - // updateMessage container to hold update messages. type updateMessage struct { Status string `json:"status"` From f5d6be158e7f10b9d9bccd5b144cfbdc9cc2a217 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 26 Jan 2016 02:19:55 -0800 Subject: [PATCH 3/6] listObjects: Simplify channel based changes. --- pkg/fs/definitions.go | 18 ---- pkg/fs/fs-bucket-listobjects.go | 183 +++++++++++++++++++++----------- pkg/fs/fs.go | 5 +- 3 files changed, 121 insertions(+), 85 deletions(-) diff --git a/pkg/fs/definitions.go b/pkg/fs/definitions.go index 93924cdc5..96c31a4ef 100644 --- a/pkg/fs/definitions.go +++ b/pkg/fs/definitions.go @@ -126,24 +126,6 @@ type ListObjectsResult struct { Prefixes []string } -type listObjectsReq struct { - Bucket string - Prefix string - Marker string - Delimiter string - MaxKeys int -} - -type listServiceReq struct { - req listObjectsReq - respCh chan ListObjectsResult -} - -type listWorkerReq struct { - req listObjectsReq - respCh chan ListObjectsResult -} - // CompletePart - completed part container type CompletePart struct { PartNumber int diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index 54a5cd01d..6a3e98dcd 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -18,6 +18,7 @@ package fs import ( "errors" + "hash/fnv" "os" "path/filepath" "strings" @@ -26,38 +27,59 @@ import ( "github.com/minio/minio-xl/pkg/probe" ) -func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, *probe.Error) { - bucket := startReq.Bucket - prefix := startReq.Prefix - marker := startReq.Marker - delimiter := startReq.Delimiter +type listObjectsParams struct { + Bucket string + Prefix string + Marker string + Delimiter string + MaxKeys int +} + +type listServiceReq struct { + reqParams listObjectsParams + respCh chan ListObjectsResult +} + +type listWorkerReq struct { + respCh chan ListObjectsResult +} + +// listObjects - list objects lists objects upto maxKeys for a given +// prefix. +func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (chan<- listWorkerReq, *probe.Error) { quitWalker := make(chan bool) reqCh := make(chan listWorkerReq) walkerCh := make(chan ObjectMetadata) go func() { - var rootPath string + defer close(walkerCh) + var walkPath string bucketPath := filepath.Join(fs.path, bucket) - trimBucketPathPrefix := bucketPath + string(os.PathSeparator) - prefixPath := trimBucketPathPrefix + prefix + // Bucket path prefix should always end with a separator. + bucketPathPrefix := bucketPath + string(os.PathSeparator) + prefixPath := bucketPathPrefix + prefix st, err := os.Stat(prefixPath) if err != nil && os.IsNotExist(err) { - rootPath = bucketPath + walkPath = bucketPath } else { if st.IsDir() && !strings.HasSuffix(prefix, delimiter) { - rootPath = bucketPath + walkPath = bucketPath } else { - rootPath = prefixPath + walkPath = prefixPath } } - filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { - if path == rootPath { + filepath.Walk(walkPath, func(path string, info os.FileInfo, err error) error { + // We don't need to list the walk path. + if path == walkPath { return nil } + // For all incoming directories add a ending separator. if info.IsDir() { path = path + string(os.PathSeparator) } - objectName := strings.TrimPrefix(path, trimBucketPathPrefix) + // Extract object name. + objectName := strings.TrimPrefix(path, bucketPathPrefix) if strings.HasPrefix(objectName, prefix) { + // For objectName lesser than marker, ignore. if marker >= objectName { return nil } @@ -68,52 +90,56 @@ func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, Size: info.Size(), } select { + // Send object on walker channel. case walkerCh <- object: - // Do nothing case <-quitWalker: - // Returning error ends the Walk() - return errors.New("Ending") + // Returning error ends the file tree Walk(). + return errors.New("Quit list worker.") } + // If delimiter is set, we stop if current path is a + // directory. if delimiter != "" && info.IsDir() { return filepath.SkipDir } } return nil }) - close(walkerCh) }() + go func() { - resp := ListObjectsResult{} for { select { + // Timeout after 10 seconds if request did not arrive for + // the given list parameters. case <-time.After(10 * time.Second): - quitWalker <- true - timeoutReq := listObjectsReq{bucket, prefix, marker, delimiter, 0} - fs.timeoutReqCh <- timeoutReq - // FIXME: can there be a race such that sender on reqCh panics? + quitWalker <- true // Quit file path walk if running. + // Send back the hash for this request. + fs.timeoutReqCh <- fnvSum(bucket, prefix, marker, delimiter) return case req, ok := <-reqCh: if !ok { + // If the request channel is closed, no more + // requests return here. return } - resp = ListObjectsResult{} - resp.Objects = make([]ObjectMetadata, 0) - resp.Prefixes = make([]string, 0) - count := 0 + resp := ListObjectsResult{} + var count int for object := range walkerCh { - if count == req.req.MaxKeys { + if count == maxKeys { resp.IsTruncated = true break } + // If object is a directory. if object.Mode.IsDir() { if delimiter == "" { - // Skip directories for recursive list + // Skip directories for recursive listing. continue } resp.Prefixes = append(resp.Prefixes, object.Object) } else { resp.Objects = append(resp.Objects, object) } + // Set the next marker for the next request. resp.NextMarker = object.Object count++ } @@ -124,59 +150,88 @@ func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, return reqCh, nil } -func (fs *Filesystem) startListService() *probe.Error { - listServiceReqCh := make(chan listServiceReq) - timeoutReqCh := make(chan listObjectsReq) - reqToListWorkerReqCh := make(map[string](chan<- listWorkerReq)) - reqToStr := func(bucket string, prefix string, marker string, delimiter string) string { - return strings.Join([]string{bucket, prefix, marker, delimiter}, ":") +// fnvSum calculates a hash for concatenation of all input strings. +func fnvSum(elements ...string) uint32 { + fnvHash := fnv.New32a() + for _, element := range elements { + fnvHash.Write([]byte(element)) } + return fnvHash.Sum32() +} + +// listObjectsService - list objects service manages various incoming +// list object requests by delegating them to an existing listObjects +// routine or initializes a new listObjects routine. +func (fs *Filesystem) listObjectsService() *probe.Error { + // Initialize list service request channel. + listServiceReqCh := make(chan listServiceReq) + fs.listServiceReqCh = listServiceReqCh + + // Initialize timeout request channel to receive request hashes of + // timed-out requests. + timeoutReqCh := make(chan uint32) + fs.timeoutReqCh = timeoutReqCh + + // Initialize request hash to list worker map. + reqToListWorkerReqCh := make(map[uint32]chan<- listWorkerReq) + + // Start service in a go routine. go func() { for { select { - case timeoutReq := <-timeoutReqCh: - reqStr := reqToStr(timeoutReq.Bucket, timeoutReq.Prefix, timeoutReq.Marker, timeoutReq.Delimiter) - listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] + case reqHash := <-timeoutReqCh: + // For requests which have timed-out, close the worker + // channels proactively, this may happen for idle + // workers once in 10seconds. + listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash] if ok { close(listWorkerReqCh) } - delete(reqToListWorkerReqCh, reqStr) - case serviceReq := <-listServiceReqCh: - reqStr := reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, serviceReq.req.Marker, serviceReq.req.Delimiter) - listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] + delete(reqToListWorkerReqCh, reqHash) + case srvReq := <-listServiceReqCh: + // Save the params for readability. + bucket := srvReq.reqParams.Bucket + prefix := srvReq.reqParams.Prefix + marker := srvReq.reqParams.Marker + delimiter := srvReq.reqParams.Delimiter + maxKeys := srvReq.reqParams.MaxKeys + + // Generate hash. + reqHash := fnvSum(bucket, prefix, marker, delimiter) + listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash] if !ok { var err *probe.Error - listWorkerReqCh, err = fs.listWorker(serviceReq.req) + listWorkerReqCh, err = fs.listObjects(bucket, prefix, marker, delimiter, maxKeys) if err != nil { - serviceReq.respCh <- ListObjectsResult{} + srvReq.respCh <- ListObjectsResult{} return } - reqToListWorkerReqCh[reqStr] = listWorkerReqCh + reqToListWorkerReqCh[reqHash] = listWorkerReqCh } respCh := make(chan ListObjectsResult) - listWorkerReqCh <- listWorkerReq{serviceReq.req, respCh} + listWorkerReqCh <- listWorkerReq{respCh} resp, ok := <-respCh if !ok { - serviceReq.respCh <- ListObjectsResult{} + srvReq.respCh <- ListObjectsResult{} return } - delete(reqToListWorkerReqCh, reqStr) + delete(reqToListWorkerReqCh, reqHash) if !resp.IsTruncated { close(listWorkerReqCh) } else { - reqStr = reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, resp.NextMarker, serviceReq.req.Delimiter) - reqToListWorkerReqCh[reqStr] = listWorkerReqCh + nextMarker := resp.NextMarker + reqHash = fnvSum(bucket, prefix, nextMarker, delimiter) + reqToListWorkerReqCh[reqHash] = listWorkerReqCh } - serviceReq.respCh <- resp + srvReq.respCh <- resp } } }() - fs.timeoutReqCh = timeoutReqCh - fs.listServiceReqCh = listServiceReqCh return nil } -// ListObjects - +// ListObjects - lists all objects for a given prefix, returns upto +// maxKeys number of objects per call. func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { fs.lock.Lock() defer fs.lock.Unlock() @@ -195,24 +250,24 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe return ListObjectsResult{}, probe.NewError(e) } - req := listObjectsReq{} - req.Bucket = bucket - req.Prefix = filepath.FromSlash(prefix) - req.Marker = filepath.FromSlash(marker) - req.Delimiter = filepath.FromSlash(delimiter) - req.MaxKeys = maxKeys + reqParams := listObjectsParams{} + reqParams.Bucket = bucket + reqParams.Prefix = filepath.FromSlash(prefix) + reqParams.Marker = filepath.FromSlash(marker) + reqParams.Delimiter = filepath.FromSlash(delimiter) + reqParams.MaxKeys = maxKeys respCh := make(chan ListObjectsResult) - fs.listServiceReqCh <- listServiceReq{req, respCh} + fs.listServiceReqCh <- listServiceReq{reqParams, respCh} resp := <-respCh - for i := 0; i < len(resp.Prefixes); i++ { + for i := range resp.Prefixes { resp.Prefixes[i] = filepath.ToSlash(resp.Prefixes[i]) } - for i := 0; i < len(resp.Objects); i++ { + for i := range resp.Objects { resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object) } - if req.Delimiter == "" { + if reqParams.Delimiter == "" { // This element is set only if you have delimiter set. // If response does not include the NextMaker and it is // truncated, you can use the value of the last Key in the diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 39f6a44a1..99248299c 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -34,7 +34,7 @@ type Filesystem struct { multiparts *Multiparts buckets *Buckets listServiceReqCh chan<- listServiceReq - timeoutReqCh chan<- listObjectsReq + timeoutReqCh chan<- uint32 } // Buckets holds acl information @@ -106,8 +106,7 @@ func New(rootPath string) (Filesystem, *probe.Error) { fs.minFreeDisk = 10 // Start list goroutine. - err = fs.startListService() - if err != nil { + if err = fs.listObjectsService(); err != nil { return Filesystem{}, err.Trace(rootPath) } // Return here. From 1341fb79c33fadfb41d5ddff800d6db052a96026 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 26 Jan 2016 11:48:52 -0800 Subject: [PATCH 4/6] listBuckets: Bump up the limit of max buckets to 1000. --- pkg/fs/fs-bucket-listobjects.go | 25 +++++++++++++++++-------- pkg/fs/fs.go | 2 +- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index 6a3e98dcd..ef031e277 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -27,14 +27,24 @@ import ( "github.com/minio/minio-xl/pkg/probe" ) +// listObjectsParams - list objects input parameters. type listObjectsParams struct { - Bucket string - Prefix string - Marker string + // Bucket name to list the objects for. + Bucket string + // list all objects with this parameter as common prefix. + Prefix string + // list all objects starting with object after marker in + // lexicographical order. + Marker string + // list all objects until the first occurrence of the delimtier + // after the prefix. Delimiter string - MaxKeys int + // maximum number of objects returned per listObjects() + // operation. + MaxKeys int } +// listServiceReq type listServiceReq struct { reqParams listObjectsParams respCh chan ListObjectsResult @@ -44,8 +54,7 @@ type listWorkerReq struct { respCh chan ListObjectsResult } -// listObjects - list objects lists objects upto maxKeys for a given -// prefix. +// listObjects - list objects lists objects upto maxKeys for a given prefix. func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (chan<- listWorkerReq, *probe.Error) { quitWalker := make(chan bool) reqCh := make(chan listWorkerReq) @@ -67,7 +76,7 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe walkPath = prefixPath } } - filepath.Walk(walkPath, func(path string, info os.FileInfo, err error) error { + Walk(walkPath, func(path string, info os.FileInfo, err error) error { // We don't need to list the walk path. if path == walkPath { return nil @@ -99,7 +108,7 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe // If delimiter is set, we stop if current path is a // directory. if delimiter != "" && info.IsDir() { - return filepath.SkipDir + return ErrSkipDir } } return nil diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 99248299c..ed88cf747 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -101,7 +101,7 @@ func New(rootPath string) (Filesystem, *probe.Error) { /// Defaults // maximum buckets to be listed from list buckets. - fs.maxBuckets = 100 + fs.maxBuckets = 1000 // minium free disk required for i/o operations to succeed. fs.minFreeDisk = 10 From 13feabefd5604899b05114c4ed759572c27cae1a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 26 Jan 2016 12:08:45 -0800 Subject: [PATCH 5/6] diskInfo: Add DiskInfo API --- routers.go | 9 +++------ web-definitions.go | 3 +++ web-handlers.go | 14 ++++++++++++++ 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/routers.go b/routers.go index 8f8e5809c..67bcf4e23 100644 --- a/routers.go +++ b/routers.go @@ -40,8 +40,8 @@ type CloudStorageAPI struct { // WebAPI container for Web API. type WebAPI struct { - // Do not check for incoming authorization header. - Anonymous bool + // FSPath filesystem path. + FSPath string // Once true log all incoming request. AccessLog bool // Minio client instance. @@ -53,9 +53,6 @@ func getWebAPIHandler(web *WebAPI) http.Handler { TimeValidityHandler, // Validate time. CorsHandler, // CORS added only for testing purposes. } - if !web.Anonymous { - mwHandlers = append(mwHandlers, AuthHandler) - } if web.AccessLog { mwHandlers = append(mwHandlers, AccessLogHandler) } @@ -120,7 +117,7 @@ func getNewWebAPI(conf cloudServerConfig) *WebAPI { fatalIf(probe.NewError(e), "Unable to initialize minio client", nil) web := &WebAPI{ - Anonymous: conf.Anonymous, + FSPath: conf.Path, AccessLog: conf.AccessLog, Client: client, } diff --git a/web-definitions.go b/web-definitions.go index 3bc868a83..1e183b67f 100644 --- a/web-definitions.go +++ b/web-definitions.go @@ -23,6 +23,9 @@ type MakeBucketArgs struct { BucketName string `json:"bucketName"` } +// DiskInfoArgs - disk info args. +type DiskInfoArgs struct{} + // ListBucketsArgs - list bucket args. type ListBucketsArgs struct{} diff --git a/web-handlers.go b/web-handlers.go index b6d7fc452..b5f89ddba 100644 --- a/web-handlers.go +++ b/web-handlers.go @@ -22,6 +22,7 @@ import ( "time" jwtgo "github.com/dgrijalva/jwt-go" + "github.com/minio/minio/pkg/disk" ) // isAuthenticated validates if any incoming request to be a valid JWT @@ -40,6 +41,19 @@ func isAuthenticated(req *http.Request) bool { return tokenRequest.Valid } +// DiskInfo - get disk statistics. +func (web *WebAPI) DiskInfo(r *http.Request, args *DiskInfoArgs, reply *disk.Info) error { + if !isAuthenticated(r) { + return errUnAuthorizedRequest + } + info, err := disk.GetInfo(web.FSPath) + if err != nil { + return err + } + *reply = info + return nil +} + // MakeBucket - make a bucket. func (web *WebAPI) MakeBucket(r *http.Request, args *MakeBucketArgs, reply *string) error { if !isAuthenticated(r) { From 18375b7794a4c36f03bbd350a53b0a787ec41d8a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 26 Jan 2016 12:34:04 -0800 Subject: [PATCH 6/6] ioutils: Add tests --- pkg/disk/disk_test.go | 1 + pkg/ioutils/ioutils.go | 24 +++++++++++++++-- pkg/ioutils/ioutils_test.go | 54 +++++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 pkg/ioutils/ioutils_test.go diff --git a/pkg/disk/disk_test.go b/pkg/disk/disk_test.go index e463fa12f..f7e0fb024 100644 --- a/pkg/disk/disk_test.go +++ b/pkg/disk/disk_test.go @@ -34,6 +34,7 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestFree(c *C) { path, err := ioutil.TempDir(os.TempDir(), "minio-") + defer os.RemoveAll(path) c.Assert(err, IsNil) di, err := disk.GetInfo(path) diff --git a/pkg/ioutils/ioutils.go b/pkg/ioutils/ioutils.go index a81e59047..21273f3c6 100644 --- a/pkg/ioutils/ioutils.go +++ b/pkg/ioutils/ioutils.go @@ -1,3 +1,19 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package ioutils import ( @@ -20,10 +36,12 @@ func ReadDirN(dirname string, n int) ([]os.FileInfo, error) { return nil, err } list, err := f.Readdir(n) - f.Close() if err != nil { return nil, err } + if err = f.Close(); err != nil { + return nil, err + } sort.Sort(byName(list)) return list, nil } @@ -36,10 +54,12 @@ func ReadDirNamesN(dirname string, n int) ([]string, error) { return nil, err } names, err := f.Readdirnames(n) - f.Close() if err != nil { return nil, err } + if err = f.Close(); err != nil { + return nil, err + } sort.Strings(names) return names, nil } diff --git a/pkg/ioutils/ioutils_test.go b/pkg/ioutils/ioutils_test.go new file mode 100644 index 000000000..996bd02cd --- /dev/null +++ b/pkg/ioutils/ioutils_test.go @@ -0,0 +1,54 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ioutils_test + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/minio/minio/pkg/ioutils" + + . "gopkg.in/check.v1" +) + +func Test(t *testing.T) { TestingT(t) } + +type MySuite struct{} + +var _ = Suite(&MySuite{}) + +func (s *MySuite) TestIoutils(c *C) { + path, err := ioutil.TempDir(os.TempDir(), "minio-") + c.Assert(err, IsNil) + defer os.RemoveAll(path) + + var count int + for count < 102 { + count++ + err = os.MkdirAll(filepath.Join(path, fmt.Sprintf("minio-%d", count)), 0700) + c.Assert(err, IsNil) + } + dirs, err := ioutils.ReadDirN(path, 100) + c.Assert(err, IsNil) + c.Assert(len(dirs), Equals, 100) + dirNames, err := ioutils.ReadDirNamesN(path, 100) + c.Assert(err, IsNil) + c.Assert(len(dirNames), Equals, 100) +}