|
|
@ -18,6 +18,7 @@ package fs |
|
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
import ( |
|
|
|
"errors" |
|
|
|
"errors" |
|
|
|
|
|
|
|
"hash/fnv" |
|
|
|
"os" |
|
|
|
"os" |
|
|
|
"path/filepath" |
|
|
|
"path/filepath" |
|
|
|
"strings" |
|
|
|
"strings" |
|
|
@ -26,38 +27,59 @@ import ( |
|
|
|
"github.com/minio/minio-xl/pkg/probe" |
|
|
|
"github.com/minio/minio-xl/pkg/probe" |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, *probe.Error) { |
|
|
|
type listObjectsParams struct { |
|
|
|
bucket := startReq.Bucket |
|
|
|
Bucket string |
|
|
|
prefix := startReq.Prefix |
|
|
|
Prefix string |
|
|
|
marker := startReq.Marker |
|
|
|
Marker string |
|
|
|
delimiter := startReq.Delimiter |
|
|
|
Delimiter string |
|
|
|
|
|
|
|
MaxKeys int |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type listServiceReq struct { |
|
|
|
|
|
|
|
reqParams listObjectsParams |
|
|
|
|
|
|
|
respCh chan ListObjectsResult |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type listWorkerReq struct { |
|
|
|
|
|
|
|
respCh chan ListObjectsResult |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// listObjects - list objects lists objects upto maxKeys for a given
|
|
|
|
|
|
|
|
// prefix.
|
|
|
|
|
|
|
|
func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (chan<- listWorkerReq, *probe.Error) { |
|
|
|
quitWalker := make(chan bool) |
|
|
|
quitWalker := make(chan bool) |
|
|
|
reqCh := make(chan listWorkerReq) |
|
|
|
reqCh := make(chan listWorkerReq) |
|
|
|
walkerCh := make(chan ObjectMetadata) |
|
|
|
walkerCh := make(chan ObjectMetadata) |
|
|
|
go func() { |
|
|
|
go func() { |
|
|
|
var rootPath string |
|
|
|
defer close(walkerCh) |
|
|
|
|
|
|
|
var walkPath string |
|
|
|
bucketPath := filepath.Join(fs.path, bucket) |
|
|
|
bucketPath := filepath.Join(fs.path, bucket) |
|
|
|
trimBucketPathPrefix := bucketPath + string(os.PathSeparator) |
|
|
|
// Bucket path prefix should always end with a separator.
|
|
|
|
prefixPath := trimBucketPathPrefix + prefix |
|
|
|
bucketPathPrefix := bucketPath + string(os.PathSeparator) |
|
|
|
|
|
|
|
prefixPath := bucketPathPrefix + prefix |
|
|
|
st, err := os.Stat(prefixPath) |
|
|
|
st, err := os.Stat(prefixPath) |
|
|
|
if err != nil && os.IsNotExist(err) { |
|
|
|
if err != nil && os.IsNotExist(err) { |
|
|
|
rootPath = bucketPath |
|
|
|
walkPath = bucketPath |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
if st.IsDir() && !strings.HasSuffix(prefix, delimiter) { |
|
|
|
if st.IsDir() && !strings.HasSuffix(prefix, delimiter) { |
|
|
|
rootPath = bucketPath |
|
|
|
walkPath = bucketPath |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
rootPath = prefixPath |
|
|
|
walkPath = prefixPath |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { |
|
|
|
filepath.Walk(walkPath, func(path string, info os.FileInfo, err error) error { |
|
|
|
if path == rootPath { |
|
|
|
// We don't need to list the walk path.
|
|
|
|
|
|
|
|
if path == walkPath { |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// For all incoming directories add a ending separator.
|
|
|
|
if info.IsDir() { |
|
|
|
if info.IsDir() { |
|
|
|
path = path + string(os.PathSeparator) |
|
|
|
path = path + string(os.PathSeparator) |
|
|
|
} |
|
|
|
} |
|
|
|
objectName := strings.TrimPrefix(path, trimBucketPathPrefix) |
|
|
|
// Extract object name.
|
|
|
|
|
|
|
|
objectName := strings.TrimPrefix(path, bucketPathPrefix) |
|
|
|
if strings.HasPrefix(objectName, prefix) { |
|
|
|
if strings.HasPrefix(objectName, prefix) { |
|
|
|
|
|
|
|
// For objectName lesser than marker, ignore.
|
|
|
|
if marker >= objectName { |
|
|
|
if marker >= objectName { |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
@ -68,52 +90,56 @@ func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, |
|
|
|
Size: info.Size(), |
|
|
|
Size: info.Size(), |
|
|
|
} |
|
|
|
} |
|
|
|
select { |
|
|
|
select { |
|
|
|
|
|
|
|
// Send object on walker channel.
|
|
|
|
case walkerCh <- object: |
|
|
|
case walkerCh <- object: |
|
|
|
// Do nothing
|
|
|
|
|
|
|
|
case <-quitWalker: |
|
|
|
case <-quitWalker: |
|
|
|
// Returning error ends the Walk()
|
|
|
|
// Returning error ends the file tree Walk().
|
|
|
|
return errors.New("Ending") |
|
|
|
return errors.New("Quit list worker.") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If delimiter is set, we stop if current path is a
|
|
|
|
|
|
|
|
// directory.
|
|
|
|
if delimiter != "" && info.IsDir() { |
|
|
|
if delimiter != "" && info.IsDir() { |
|
|
|
return filepath.SkipDir |
|
|
|
return filepath.SkipDir |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
}) |
|
|
|
}) |
|
|
|
close(walkerCh) |
|
|
|
|
|
|
|
}() |
|
|
|
}() |
|
|
|
|
|
|
|
|
|
|
|
go func() { |
|
|
|
go func() { |
|
|
|
resp := ListObjectsResult{} |
|
|
|
|
|
|
|
for { |
|
|
|
for { |
|
|
|
select { |
|
|
|
select { |
|
|
|
|
|
|
|
// Timeout after 10 seconds if request did not arrive for
|
|
|
|
|
|
|
|
// the given list parameters.
|
|
|
|
case <-time.After(10 * time.Second): |
|
|
|
case <-time.After(10 * time.Second): |
|
|
|
quitWalker <- true |
|
|
|
quitWalker <- true // Quit file path walk if running.
|
|
|
|
timeoutReq := listObjectsReq{bucket, prefix, marker, delimiter, 0} |
|
|
|
// Send back the hash for this request.
|
|
|
|
fs.timeoutReqCh <- timeoutReq |
|
|
|
fs.timeoutReqCh <- fnvSum(bucket, prefix, marker, delimiter) |
|
|
|
// FIXME: can there be a race such that sender on reqCh panics?
|
|
|
|
|
|
|
|
return |
|
|
|
return |
|
|
|
case req, ok := <-reqCh: |
|
|
|
case req, ok := <-reqCh: |
|
|
|
if !ok { |
|
|
|
if !ok { |
|
|
|
|
|
|
|
// If the request channel is closed, no more
|
|
|
|
|
|
|
|
// requests return here.
|
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
resp = ListObjectsResult{} |
|
|
|
resp := ListObjectsResult{} |
|
|
|
resp.Objects = make([]ObjectMetadata, 0) |
|
|
|
var count int |
|
|
|
resp.Prefixes = make([]string, 0) |
|
|
|
|
|
|
|
count := 0 |
|
|
|
|
|
|
|
for object := range walkerCh { |
|
|
|
for object := range walkerCh { |
|
|
|
if count == req.req.MaxKeys { |
|
|
|
if count == maxKeys { |
|
|
|
resp.IsTruncated = true |
|
|
|
resp.IsTruncated = true |
|
|
|
break |
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If object is a directory.
|
|
|
|
if object.Mode.IsDir() { |
|
|
|
if object.Mode.IsDir() { |
|
|
|
if delimiter == "" { |
|
|
|
if delimiter == "" { |
|
|
|
// Skip directories for recursive list
|
|
|
|
// Skip directories for recursive listing.
|
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
resp.Prefixes = append(resp.Prefixes, object.Object) |
|
|
|
resp.Prefixes = append(resp.Prefixes, object.Object) |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
resp.Objects = append(resp.Objects, object) |
|
|
|
resp.Objects = append(resp.Objects, object) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Set the next marker for the next request.
|
|
|
|
resp.NextMarker = object.Object |
|
|
|
resp.NextMarker = object.Object |
|
|
|
count++ |
|
|
|
count++ |
|
|
|
} |
|
|
|
} |
|
|
@ -124,59 +150,88 @@ func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, |
|
|
|
return reqCh, nil |
|
|
|
return reqCh, nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (fs *Filesystem) startListService() *probe.Error { |
|
|
|
// fnvSum calculates a hash for concatenation of all input strings.
|
|
|
|
listServiceReqCh := make(chan listServiceReq) |
|
|
|
func fnvSum(elements ...string) uint32 { |
|
|
|
timeoutReqCh := make(chan listObjectsReq) |
|
|
|
fnvHash := fnv.New32a() |
|
|
|
reqToListWorkerReqCh := make(map[string](chan<- listWorkerReq)) |
|
|
|
for _, element := range elements { |
|
|
|
reqToStr := func(bucket string, prefix string, marker string, delimiter string) string { |
|
|
|
fnvHash.Write([]byte(element)) |
|
|
|
return strings.Join([]string{bucket, prefix, marker, delimiter}, ":") |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return fnvHash.Sum32() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// listObjectsService - list objects service manages various incoming
|
|
|
|
|
|
|
|
// list object requests by delegating them to an existing listObjects
|
|
|
|
|
|
|
|
// routine or initializes a new listObjects routine.
|
|
|
|
|
|
|
|
func (fs *Filesystem) listObjectsService() *probe.Error { |
|
|
|
|
|
|
|
// Initialize list service request channel.
|
|
|
|
|
|
|
|
listServiceReqCh := make(chan listServiceReq) |
|
|
|
|
|
|
|
fs.listServiceReqCh = listServiceReqCh |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Initialize timeout request channel to receive request hashes of
|
|
|
|
|
|
|
|
// timed-out requests.
|
|
|
|
|
|
|
|
timeoutReqCh := make(chan uint32) |
|
|
|
|
|
|
|
fs.timeoutReqCh = timeoutReqCh |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Initialize request hash to list worker map.
|
|
|
|
|
|
|
|
reqToListWorkerReqCh := make(map[uint32]chan<- listWorkerReq) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Start service in a go routine.
|
|
|
|
go func() { |
|
|
|
go func() { |
|
|
|
for { |
|
|
|
for { |
|
|
|
select { |
|
|
|
select { |
|
|
|
case timeoutReq := <-timeoutReqCh: |
|
|
|
case reqHash := <-timeoutReqCh: |
|
|
|
reqStr := reqToStr(timeoutReq.Bucket, timeoutReq.Prefix, timeoutReq.Marker, timeoutReq.Delimiter) |
|
|
|
// For requests which have timed-out, close the worker
|
|
|
|
listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] |
|
|
|
// channels proactively, this may happen for idle
|
|
|
|
|
|
|
|
// workers once in 10seconds.
|
|
|
|
|
|
|
|
listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash] |
|
|
|
if ok { |
|
|
|
if ok { |
|
|
|
close(listWorkerReqCh) |
|
|
|
close(listWorkerReqCh) |
|
|
|
} |
|
|
|
} |
|
|
|
delete(reqToListWorkerReqCh, reqStr) |
|
|
|
delete(reqToListWorkerReqCh, reqHash) |
|
|
|
case serviceReq := <-listServiceReqCh: |
|
|
|
case srvReq := <-listServiceReqCh: |
|
|
|
reqStr := reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, serviceReq.req.Marker, serviceReq.req.Delimiter) |
|
|
|
// Save the params for readability.
|
|
|
|
listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] |
|
|
|
bucket := srvReq.reqParams.Bucket |
|
|
|
|
|
|
|
prefix := srvReq.reqParams.Prefix |
|
|
|
|
|
|
|
marker := srvReq.reqParams.Marker |
|
|
|
|
|
|
|
delimiter := srvReq.reqParams.Delimiter |
|
|
|
|
|
|
|
maxKeys := srvReq.reqParams.MaxKeys |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Generate hash.
|
|
|
|
|
|
|
|
reqHash := fnvSum(bucket, prefix, marker, delimiter) |
|
|
|
|
|
|
|
listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash] |
|
|
|
if !ok { |
|
|
|
if !ok { |
|
|
|
var err *probe.Error |
|
|
|
var err *probe.Error |
|
|
|
listWorkerReqCh, err = fs.listWorker(serviceReq.req) |
|
|
|
listWorkerReqCh, err = fs.listObjects(bucket, prefix, marker, delimiter, maxKeys) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
serviceReq.respCh <- ListObjectsResult{} |
|
|
|
srvReq.respCh <- ListObjectsResult{} |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
reqToListWorkerReqCh[reqStr] = listWorkerReqCh |
|
|
|
reqToListWorkerReqCh[reqHash] = listWorkerReqCh |
|
|
|
} |
|
|
|
} |
|
|
|
respCh := make(chan ListObjectsResult) |
|
|
|
respCh := make(chan ListObjectsResult) |
|
|
|
listWorkerReqCh <- listWorkerReq{serviceReq.req, respCh} |
|
|
|
listWorkerReqCh <- listWorkerReq{respCh} |
|
|
|
resp, ok := <-respCh |
|
|
|
resp, ok := <-respCh |
|
|
|
if !ok { |
|
|
|
if !ok { |
|
|
|
serviceReq.respCh <- ListObjectsResult{} |
|
|
|
srvReq.respCh <- ListObjectsResult{} |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
delete(reqToListWorkerReqCh, reqStr) |
|
|
|
delete(reqToListWorkerReqCh, reqHash) |
|
|
|
if !resp.IsTruncated { |
|
|
|
if !resp.IsTruncated { |
|
|
|
close(listWorkerReqCh) |
|
|
|
close(listWorkerReqCh) |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
reqStr = reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, resp.NextMarker, serviceReq.req.Delimiter) |
|
|
|
nextMarker := resp.NextMarker |
|
|
|
reqToListWorkerReqCh[reqStr] = listWorkerReqCh |
|
|
|
reqHash = fnvSum(bucket, prefix, nextMarker, delimiter) |
|
|
|
|
|
|
|
reqToListWorkerReqCh[reqHash] = listWorkerReqCh |
|
|
|
} |
|
|
|
} |
|
|
|
serviceReq.respCh <- resp |
|
|
|
srvReq.respCh <- resp |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}() |
|
|
|
}() |
|
|
|
fs.timeoutReqCh = timeoutReqCh |
|
|
|
|
|
|
|
fs.listServiceReqCh = listServiceReqCh |
|
|
|
|
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// ListObjects -
|
|
|
|
// ListObjects - lists all objects for a given prefix, returns upto
|
|
|
|
|
|
|
|
// maxKeys number of objects per call.
|
|
|
|
func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { |
|
|
|
func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { |
|
|
|
fs.lock.Lock() |
|
|
|
fs.lock.Lock() |
|
|
|
defer fs.lock.Unlock() |
|
|
|
defer fs.lock.Unlock() |
|
|
@ -195,24 +250,24 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe |
|
|
|
return ListObjectsResult{}, probe.NewError(e) |
|
|
|
return ListObjectsResult{}, probe.NewError(e) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
req := listObjectsReq{} |
|
|
|
reqParams := listObjectsParams{} |
|
|
|
req.Bucket = bucket |
|
|
|
reqParams.Bucket = bucket |
|
|
|
req.Prefix = filepath.FromSlash(prefix) |
|
|
|
reqParams.Prefix = filepath.FromSlash(prefix) |
|
|
|
req.Marker = filepath.FromSlash(marker) |
|
|
|
reqParams.Marker = filepath.FromSlash(marker) |
|
|
|
req.Delimiter = filepath.FromSlash(delimiter) |
|
|
|
reqParams.Delimiter = filepath.FromSlash(delimiter) |
|
|
|
req.MaxKeys = maxKeys |
|
|
|
reqParams.MaxKeys = maxKeys |
|
|
|
|
|
|
|
|
|
|
|
respCh := make(chan ListObjectsResult) |
|
|
|
respCh := make(chan ListObjectsResult) |
|
|
|
fs.listServiceReqCh <- listServiceReq{req, respCh} |
|
|
|
fs.listServiceReqCh <- listServiceReq{reqParams, respCh} |
|
|
|
resp := <-respCh |
|
|
|
resp := <-respCh |
|
|
|
|
|
|
|
|
|
|
|
for i := 0; i < len(resp.Prefixes); i++ { |
|
|
|
for i := range resp.Prefixes { |
|
|
|
resp.Prefixes[i] = filepath.ToSlash(resp.Prefixes[i]) |
|
|
|
resp.Prefixes[i] = filepath.ToSlash(resp.Prefixes[i]) |
|
|
|
} |
|
|
|
} |
|
|
|
for i := 0; i < len(resp.Objects); i++ { |
|
|
|
for i := range resp.Objects { |
|
|
|
resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object) |
|
|
|
resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object) |
|
|
|
} |
|
|
|
} |
|
|
|
if req.Delimiter == "" { |
|
|
|
if reqParams.Delimiter == "" { |
|
|
|
// This element is set only if you have delimiter set.
|
|
|
|
// This element is set only if you have delimiter set.
|
|
|
|
// If response does not include the NextMaker and it is
|
|
|
|
// If response does not include the NextMaker and it is
|
|
|
|
// truncated, you can use the value of the last Key in the
|
|
|
|
// truncated, you can use the value of the last Key in the
|
|
|
|