listObjects: do not do stat during readdir()

* listObjects: improve response time by not doing stat during readDir() operation.

* listObjects: Add windows support.

* listObjects: Readdir() in batches to conserve memory. Add solaris build.

* listObjects: cleanup code.
master
Krishna Srinivas 9 years ago committed by Harshavardhana
parent 7623e0f8e8
commit 85ab1df5a8
  1. 90
      fs-bucket-listobjects.go
  2. 193
      fs-dir-common.go
  3. 101
      fs-dir-nix.go
  4. 62
      fs-dir-others.go
  5. 306
      fs-dir.go
  6. 25
      fs-dirent-fileno.go
  7. 25
      fs-dirent-ino.go
  8. 17
      fs-object.go
  9. 30
      fs.go

@ -20,18 +20,30 @@ import (
"fmt"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"github.com/minio/minio/pkg/probe"
)
const (
// listObjectsLimit - maximum list objects limit.
listObjectsLimit = 1000
)
// isDirExist - returns whether given directory is exist or not.
func isDirExist(dirname string) (status bool, err error) {
fi, err := os.Lstat(dirname)
if err == nil {
status = fi.IsDir()
}
return
}
// ListObjects - lists all objects for a given prefix, returns up to
// maxKeys number of objects per call.
func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, *probe.Error) {
result := ListObjectsInfo{}
var queryPrefix string
// Input validation.
if !IsValidBucketName(bucket) {
@ -58,7 +70,7 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != "/" {
return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter))
return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported. Only '/' is supported", delimiter))
}
// Marker is set unescape.
@ -72,7 +84,6 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
if !strings.HasPrefix(marker, prefix) {
return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", marker, prefix))
}
}
// Return empty response for a valid request when maxKeys is 0.
@ -100,79 +111,50 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
}
recursive := true
skipDir := true
if delimiter == "/" {
skipDir = false
recursive = false
}
// Maximum 1000 objects returned in a single to listObjects.
// Further calls will set right marker value to continue reading the rest of the objectList.
// popListObjectCh returns nil if the call to ListObject is done for the first time.
// popTreeWalker returns nil if the call to ListObject is done for the first time.
// On further calls to ListObjects to retrive more objects within the timeout period,
// popListObjectCh returns the channel from which rest of the objects can be retrieved.
objectInfoCh := fs.popListObjectCh(listObjectParams{bucket, delimiter, marker, prefix})
if objectInfoCh == nil {
if prefix != "" {
// queryPrefix variable is set to value of the prefix to be searched.
// If prefix contains directory hierarchy queryPrefix is set to empty string,
// this ensure that all objects inside the prefixDir is listed.
// Otherwise the base name is extracted from path.Base and it'll be will be set to Querystring.
// if prefix = /Asia/India/, queryPrefix will be set to empty string(""), so that all objects in prefixDir are listed.
// if prefix = /Asia/India/summerpics , Querystring will be set to "summerpics",
// so those all objects with the prefix "summerpics" inside the /Asia/India/ prefix folder gets listed.
if prefix[len(prefix)-1:] == "/" {
queryPrefix = ""
} else {
queryPrefix = path.Base(prefix)
}
}
ch := treeWalk(rootDir, bucketDir, recursive, queryPrefix)
objectInfoCh = &ch
// popTreeWalker returns the channel from which rest of the objects can be retrieved.
walker := fs.popTreeWalker(listObjectParams{bucket, delimiter, marker, prefix})
if walker == nil {
walker = startTreeWalker(fs.path, bucket, filepath.FromSlash(prefix), filepath.FromSlash(marker), recursive)
}
nextMarker := ""
for i := 0; i < maxKeys; {
objInfo, ok := objectInfoCh.Read()
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
return result, nil
}
if objInfo.Err != nil {
return ListObjectsInfo{}, probe.NewError(objInfo.Err)
if walkResult.err != nil {
return ListObjectsInfo{}, probe.NewError(walkResult.err)
}
objInfo := walkResult.objectInfo
objInfo.Name = filepath.ToSlash(objInfo.Name)
if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") {
continue
}
if objInfo.IsDir && skipDir {
continue
if objInfo.IsDir {
result.Prefixes = append(result.Prefixes, objInfo.Name)
} else {
result.Objects = append(result.Objects, objInfo)
}
// Add the bucket.
objInfo.Bucket = bucket
// In case its not the first call to ListObjects (before timeout),
// The result is already inside the buffered channel.
if objInfo.Name > marker {
if objInfo.IsDir {
result.Prefixes = append(result.Prefixes, objInfo.Name)
} else {
result.Objects = append(result.Objects, objInfo)
}
nextMarker = objInfo.Name
i++
if walkResult.end {
return result, nil
}
}
if !objectInfoCh.IsClosed() {
result.IsTruncated = true
result.NextMarker = nextMarker
fs.pushListObjectCh(listObjectParams{bucket, delimiter, nextMarker, prefix}, *objectInfoCh)
nextMarker = objInfo.Name
i++
}
result.IsTruncated = true
result.NextMarker = nextMarker
fs.pushTreeWalker(listObjectParams{bucket, delimiter, nextMarker, prefix}, walker)
return result, nil
}

@ -0,0 +1,193 @@
package main
import (
"os"
"path/filepath"
"sort"
"strings"
"time"
)
type fsDirent struct {
name string
modifiedTime time.Time // On unix this is empty.
size int64 // On unix this is empty.
isDir bool
}
type fsDirents []fsDirent
func (d fsDirents) Len() int { return len(d) }
func (d fsDirents) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d fsDirents) Less(i, j int) bool {
n1 := d[i].name
if d[i].isDir {
n1 = n1 + string(os.PathSeparator)
}
n2 := d[j].name
if d[j].isDir {
n2 = n2 + string(os.PathSeparator)
}
return n1 < n2
}
// Using sort.Search() internally to jump to the file entry containing the prefix.
func searchDirents(dirents []fsDirent, x string) int {
processFunc := func(i int) bool {
return dirents[i].name >= x
}
return sort.Search(len(dirents), processFunc)
}
type treeWalkResult struct {
objectInfo ObjectInfo
err error
end bool
}
type treeWalker struct {
ch <-chan treeWalkResult
timedOut bool
}
// treeWalk walks FS directory tree recursively pushing ObjectInfo into the channel as and when it encounters files.
func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int) bool {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
// convert dirent to ObjectInfo
direntToObjectInfo := func(dirent fsDirent) (ObjectInfo, error) {
objectInfo := ObjectInfo{}
// objectInfo.Name has the full object name
objectInfo.Name = filepath.Join(prefixDir, dirent.name)
if dirent.modifiedTime.IsZero() && dirent.size == 0 {
// On linux/darwin/*bsd. Refer dir_nix.go:parseDirents() for details.
// ModifiedTime and Size are zero, Stat() and figure out
// the actual values that need to be set.
fi, err := os.Stat(filepath.Join(bucketDir, prefixDir, dirent.name))
if err != nil {
return ObjectInfo{}, err
}
objectInfo.ModifiedTime = fi.ModTime()
objectInfo.Size = fi.Size()
objectInfo.IsDir = fi.IsDir()
} else {
// On windows. Refer dir_others.go:parseDirents() for details.
// If ModifiedTime or Size are set then use them
// without attempting another Stat operation.
objectInfo.ModifiedTime = dirent.modifiedTime
objectInfo.Size = dirent.size
objectInfo.IsDir = dirent.isDir
}
if objectInfo.IsDir {
// Add os.PathSeparator suffix again as filepath would have removed it
objectInfo.Size = 0
objectInfo.Name += string(os.PathSeparator)
}
return objectInfo, nil
}
markerPart := ""
markerRest := ""
if marker != "" {
// ex: if marker="four/five.txt", markerPart="four/" markerRest="five.txt"
markerSplit := strings.SplitN(marker, string(os.PathSeparator), 2)
markerPart = markerSplit[0]
if len(markerSplit) == 2 {
markerPart += string(os.PathSeparator)
markerRest = markerSplit[1]
}
}
// readDirAll returns entries that begins with entryPrefixMatch
dirents, err := readDirAll(filepath.Join(bucketDir, prefixDir), entryPrefixMatch)
if err != nil {
send(treeWalkResult{err: err})
return false
}
// example:
// If markerPart="four/" searchDirents() returns the index of "four/" in the sorted
// dirents list. We skip all the dirent entries till "four/"
dirents = dirents[searchDirents(dirents, markerPart):]
*count += len(dirents)
for i, dirent := range dirents {
if i == 0 && markerPart == dirent.name && !dirent.isDir {
// If the first entry is not a directory
// we need to skip this entry.
*count--
continue
}
if dirent.isDir && recursive {
// If the entry is a directory, we will need recurse into it.
markerArg := ""
if dirent.name == markerPart {
// we need to pass "five.txt" as marker only if we are recursing into "four/"
markerArg = markerRest
}
*count--
if !treeWalk(bucketDir, filepath.Join(prefixDir, dirent.name), "", markerArg, recursive, send, count) {
return false
}
continue
}
objectInfo, err := direntToObjectInfo(dirent)
if err != nil {
send(treeWalkResult{err: err})
return false
}
*count--
if !send(treeWalkResult{objectInfo: objectInfo}) {
return false
}
}
return true
}
// Initiate a new treeWalk in a goroutine.
func startTreeWalker(fsPath, bucket, prefix, marker string, recursive bool) *treeWalker {
// Example 1
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
// and entryPrefixMatch=""
// Example 2
// if prefix is "one/two/th" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt"
// and entryPrefixMatch="th"
ch := make(chan treeWalkResult, listObjectsLimit)
walker := treeWalker{ch: ch}
entryPrefixMatch := prefix
prefixDir := ""
lastIndex := strings.LastIndex(prefix, string(os.PathSeparator))
if lastIndex != -1 {
entryPrefixMatch = prefix[lastIndex+1:]
prefixDir = prefix[:lastIndex+1]
}
count := 0
marker = strings.TrimPrefix(marker, prefixDir)
go func() {
defer close(ch)
send := func(walkResult treeWalkResult) bool {
// Add the bucket.
walkResult.objectInfo.Bucket = bucket
if count == 0 {
walkResult.end = true
}
timer := time.After(time.Second * 60)
select {
case ch <- walkResult:
return true
case <-timer:
walker.timedOut = true
return false
}
}
treeWalk(filepath.Join(fsPath, bucket), prefixDir, entryPrefixMatch, marker, recursive, send, &count)
}()
return &walker
}

@ -0,0 +1,101 @@
// +build linux darwin dragonfly freebsd netbsd openbsd
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"os"
"runtime"
"sort"
"strings"
"syscall"
"unsafe"
)
const (
// large enough buffer size for ReadDirent() syscall
readDirentBufSize = 4096 * 25
)
// actual length of the byte array from the c - world.
func clen(n []byte) int {
for i := 0; i < len(n); i++ {
if n[i] == 0 {
return i
}
}
return len(n)
}
// parseDirents - inspired from syscall_<os>.go:parseDirents()
func parseDirents(buf []byte) []fsDirent {
bufidx := 0
dirents := []fsDirent{}
for bufidx < len(buf) {
dirent := (*syscall.Dirent)(unsafe.Pointer(&buf[bufidx]))
bufidx += int(dirent.Reclen)
if skipDirent(dirent) {
continue
}
if runtime.GOOS != "linux" {
if dirent.Reclen == 0 {
break
}
}
bytes := (*[10000]byte)(unsafe.Pointer(&dirent.Name[0]))
var name = string(bytes[0:clen(bytes[:])])
if name == "." || name == ".." { // Useless names
continue
}
dirents = append(dirents, fsDirent{
name: name,
isDir: dirent.Type == syscall.DT_DIR,
})
}
return dirents
}
func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) {
buf := make([]byte, readDirentBufSize)
f, err := os.Open(readDirPath)
if err != nil {
return nil, err
}
defer f.Close()
dirents := []fsDirent{}
for {
nbuf, err := syscall.ReadDirent(int(f.Fd()), buf)
if err != nil {
return nil, err
}
if nbuf <= 0 {
break
}
for _, dirent := range parseDirents(buf[:nbuf]) {
if dirent.isDir {
dirent.name += string(os.PathSeparator)
dirent.size = 0
}
if strings.HasPrefix(dirent.name, entryPrefixMatch) {
dirents = append(dirents, dirent)
}
}
}
sort.Sort(fsDirents(dirents))
return dirents, nil
}

@ -0,0 +1,62 @@
// +build !linux,!darwin,!openbsd,!freebsd,!netbsd,!dragonfly
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"io"
"os"
"sort"
"strings"
)
func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) {
f, err := os.Open(readDirPath)
if err != nil {
return nil, err
}
defer f.Close()
var dirents []fsDirent
for {
fis, err := f.Readdir(1000)
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
for _, fi := range fis {
dirent := fsDirent{
name: fi.Name(),
size: fi.Size(),
modifiedTime: fi.ModTime(),
isDir: fi.IsDir(),
}
if dirent.isDir {
dirent.name += string(os.PathSeparator)
dirent.size = 0
}
if strings.HasPrefix(fi.Name(), entryPrefixMatch) {
dirents = append(dirents, dirent)
}
}
}
// Sort dirents.
sort.Sort(fsDirents(dirents))
return dirents, nil
}

@ -1,306 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"io"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
const (
// listObjectsLimit - maximum list objects limit.
listObjectsLimit = 1000
)
// isDirEmpty - returns whether given directory is empty or not.
func isDirEmpty(dirname string) (status bool, err error) {
f, err := os.Open(dirname)
if err == nil {
defer f.Close()
if _, err = f.Readdirnames(1); err == io.EOF {
status = true
err = nil
}
}
return
}
// isDirExist - returns whether given directory is exist or not.
func isDirExist(dirname string) (status bool, err error) {
fi, err := os.Lstat(dirname)
if err == nil {
status = fi.IsDir()
}
return
}
// byName implements sort.Interface for sorting os.FileInfo list.
type byName []os.FileInfo
func (f byName) Len() int {
return len(f)
}
func (f byName) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
func (f byName) Less(i, j int) bool {
n1 := f[i].Name()
if f[i].IsDir() {
n1 = n1 + string(os.PathSeparator)
}
n2 := f[j].Name()
if f[j].IsDir() {
n2 = n2 + string(os.PathSeparator)
}
return n1 < n2
}
// Using sort.Search() internally to jump to the file entry containing the prefix.
func searchFileInfos(fileInfos []os.FileInfo, x string) int {
processFunc := func(i int) bool {
return fileInfos[i].Name() >= x
}
return sort.Search(len(fileInfos), processFunc)
}
// readDir - read 'scanDir' directory. It returns list of ObjectInfo.
// Each object name is appended with 'namePrefix'.
func readDir(scanDir, namePrefix, queryPrefix string, isFirst bool) (objInfos []ObjectInfo) {
f, err := os.Open(scanDir)
if err != nil {
objInfos = append(objInfos, ObjectInfo{Err: err})
return
}
fis, err := f.Readdir(-1)
if err != nil {
f.Close()
objInfos = append(objInfos, ObjectInfo{Err: err})
return
}
// Close the directory.
f.Close()
// Sort files by Name.
sort.Sort(byName(fis))
var prefixIndex int
// Searching for entries with objectName containing prefix.
// Binary search is used for efficient search.
if queryPrefix != "" && isFirst {
prefixIndex = searchFileInfos(fis, queryPrefix)
if prefixIndex == len(fis) {
return
}
if !strings.HasPrefix(fis[prefixIndex].Name(), queryPrefix) {
return
}
fis = fis[prefixIndex:]
}
// Populate []ObjectInfo from []FileInfo.
for _, fi := range fis {
name := fi.Name()
if queryPrefix != "" && isFirst {
// If control is here then there is a queryPrefix, and there are objects which satisfies the prefix.
// Since the result is sorted, the object names which satisfies query prefix would be stored one after the other.
// Push the ObjectInfo only if its contains the prefix.
// This ensures that the channel containing object Info would only has objects with the given queryPrefix.
if !strings.HasPrefix(name, queryPrefix) {
return
}
}
size := fi.Size()
modTime := fi.ModTime()
isDir := fi.Mode().IsDir()
// Add prefix if name prefix exists.
if namePrefix != "" {
name = namePrefix + "/" + name
}
// For directories explicitly end with '/'.
if isDir {
name += "/"
// size is set to '0' for directories explicitly.
size = 0
}
if fi.Mode()&os.ModeSymlink == os.ModeSymlink {
// Handle symlink by doing an additional stat and follow the link.
st, e := os.Stat(filepath.Join(scanDir, name))
if e != nil {
objInfos = append(objInfos, ObjectInfo{Err: err})
return
}
size = st.Size()
modTime = st.ModTime()
isDir = st.Mode().IsDir()
// For directories explicitly end with '/'.
if isDir {
name += "/"
// size is set to '0' for directories explicitly.
size = 0
}
}
// Populate []ObjectInfo.
objInfos = append(objInfos, ObjectInfo{
Name: name,
ModifiedTime: modTime,
MD5Sum: "", // TODO
Size: size,
IsDir: isDir,
})
}
return
}
// objectInfoChannel - object info channel.
type objectInfoChannel struct {
ch <-chan ObjectInfo
objInfo *ObjectInfo
closed bool
timeoutCh <-chan struct{}
timedOut bool
}
func (oic *objectInfoChannel) Read() (ObjectInfo, bool) {
if oic.closed {
return ObjectInfo{}, false
}
if oic.objInfo == nil {
// First read.
if oi, ok := <-oic.ch; ok {
oic.objInfo = &oi
} else {
oic.closed = true
return ObjectInfo{}, false
}
}
retObjInfo := *oic.objInfo
status := true
oic.objInfo = nil
// Read once more to know whether it was last read.
if oi, ok := <-oic.ch; ok {
oic.objInfo = &oi
} else {
oic.closed = true
}
return retObjInfo, status
}
// IsClosed - return whether channel is closed or not.
func (oic objectInfoChannel) IsClosed() bool {
if oic.objInfo != nil {
return false
}
return oic.closed
}
// IsTimedOut - return whether channel is closed due to timeout.
func (oic objectInfoChannel) IsTimedOut() bool {
if oic.timedOut {
return true
}
select {
case _, ok := <-oic.timeoutCh:
if ok {
oic.timedOut = true
return true
}
return false
default:
return false
}
}
// treeWalk - walk into 'scanDir' recursively when 'recursive' is true.
// It uses 'bucketDir' to get name prefix for object name.
func treeWalk(scanDir, bucketDir string, recursive bool, queryPrefix string) objectInfoChannel {
objectInfoCh := make(chan ObjectInfo, listObjectsLimit)
timeoutCh := make(chan struct{}, 1)
// goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel.
go func() {
defer close(objectInfoCh)
defer close(timeoutCh)
// send function - returns true if ObjectInfo is sent.
// Within (time.Second * 15) else false on time-out.
send := func(oi ObjectInfo) bool {
timer := time.After(time.Second * 15)
select {
case objectInfoCh <- oi:
return true
case <-timer:
timeoutCh <- struct{}{}
return false
}
}
namePrefix := strings.Replace(filepath.ToSlash(scanDir), filepath.ToSlash(bucketDir), "", 1)
if strings.HasPrefix(namePrefix, "/") {
// Remove forward slash ("/") from beginning.
namePrefix = namePrefix[1:]
}
// The last argument (isFisrt), is set to `true` only during the first run of the function.
// This makes sure that the sub-directories inside the prefixDir are recursed
// without being asserted for prefix in the object name.
isFirst := true
for objInfos := readDir(scanDir, namePrefix, queryPrefix, isFirst); len(objInfos) > 0; {
var objInfo ObjectInfo
objInfo, objInfos = objInfos[0], objInfos[1:]
if !send(objInfo) {
return
}
if objInfo.IsDir && recursive {
scanDir := filepath.Join(bucketDir, filepath.FromSlash(objInfo.Name))
namePrefix = strings.Replace(filepath.ToSlash(scanDir), filepath.ToSlash(bucketDir), "", 1)
if strings.HasPrefix(namePrefix, "/") {
/* remove beginning "/" */
namePrefix = namePrefix[1:]
}
// The last argument is set to false in the further calls to readdir.
isFirst = false
objInfos = append(readDir(scanDir, namePrefix, queryPrefix, isFirst), objInfos...)
}
}
}()
return objectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh}
}

@ -0,0 +1,25 @@
// +build openbsd netbsd freebsd dragonfly
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "syscall"
func skipDirent(dirent *syscall.Dirent) bool {
return dirent.Fileno == 0
}

@ -0,0 +1,25 @@
// +build darwin linux
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "syscall"
func skipDirent(dirent *syscall.Dirent) bool {
return dirent.Ino == 0
}

@ -33,6 +33,23 @@ import (
"github.com/minio/minio/pkg/probe"
)
// isDirEmpty - returns whether given directory is empty or not.
func isDirEmpty(dirname string) (status bool, err error) {
f, err := os.Open(dirname)
if err == nil {
defer f.Close()
if _, err = f.Readdirnames(1); err == io.EOF {
status = true
err = nil
}
}
return
}
/// Object Operations
// GetObject - GET object

30
fs.go

@ -39,7 +39,7 @@ type Filesystem struct {
minFreeDisk int64
rwLock *sync.RWMutex
multiparts *multiparts
listObjectMap map[listObjectParams][]objectInfoChannel
listObjectMap map[listObjectParams][]*treeWalker
listObjectMapMutex *sync.Mutex
}
@ -58,33 +58,31 @@ type multiparts struct {
ActiveSession map[string]*multipartSession `json:"activeSessions"`
}
func (fs *Filesystem) pushListObjectCh(params listObjectParams, ch objectInfoChannel) {
func (fs *Filesystem) pushTreeWalker(params listObjectParams, walker *treeWalker) {
fs.listObjectMapMutex.Lock()
defer fs.listObjectMapMutex.Unlock()
channels := []objectInfoChannel{ch}
if _, ok := fs.listObjectMap[params]; ok {
channels = append(fs.listObjectMap[params], ch)
}
walkers, _ := fs.listObjectMap[params]
walkers = append(walkers, walker)
fs.listObjectMap[params] = channels
fs.listObjectMap[params] = walkers
}
func (fs *Filesystem) popListObjectCh(params listObjectParams) *objectInfoChannel {
func (fs *Filesystem) popTreeWalker(params listObjectParams) *treeWalker {
fs.listObjectMapMutex.Lock()
defer fs.listObjectMapMutex.Unlock()
if channels, ok := fs.listObjectMap[params]; ok {
for i, channel := range channels {
if !channel.IsTimedOut() {
chs := channels[i+1:]
if len(chs) > 0 {
fs.listObjectMap[params] = chs
if walkers, ok := fs.listObjectMap[params]; ok {
for i, walker := range walkers {
if !walker.timedOut {
newWalkers := walkers[i+1:]
if len(newWalkers) > 0 {
fs.listObjectMap[params] = newWalkers
} else {
delete(fs.listObjectMap, params)
}
return &channel
return walker
}
}
@ -128,7 +126,7 @@ func newFS(rootPath string) (ObjectAPI, *probe.Error) {
// Minium free disk required for i/o operations to succeed.
fs.minFreeDisk = 5
fs.listObjectMap = make(map[listObjectParams][]objectInfoChannel)
fs.listObjectMap = make(map[listObjectParams][]*treeWalker)
fs.listObjectMapMutex = &sync.Mutex{}
// Return here.

Loading…
Cancel
Save