You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
396 lines
9.2 KiB
396 lines
9.2 KiB
9 years ago
|
/*
|
||
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||
|
*
|
||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
* you may not use this file except in compliance with the License.
|
||
|
* You may obtain a copy of the License at
|
||
|
*
|
||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||
|
*
|
||
|
* Unless required by applicable law or agreed to in writing, software
|
||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
* See the License for the specific language governing permissions and
|
||
|
* limitations under the License.
|
||
|
*/
|
||
|
|
||
|
package main
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"io"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"sort"
|
||
|
"strings"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// DirEntry - directory entry
|
||
|
type DirEntry struct {
|
||
|
Name string
|
||
|
Size int64
|
||
|
Mode os.FileMode
|
||
|
ModTime time.Time
|
||
|
}
|
||
|
|
||
|
// IsDir - returns true if DirEntry is a directory
|
||
|
func (entry DirEntry) IsDir() bool {
|
||
|
return entry.Mode.IsDir()
|
||
|
}
|
||
|
|
||
|
// IsSymlink - returns true if DirEntry is a symbolic link
|
||
|
func (entry DirEntry) IsSymlink() bool {
|
||
|
return entry.Mode&os.ModeSymlink == os.ModeSymlink
|
||
|
}
|
||
|
|
||
|
// IsRegular - returns true if DirEntry is a regular file
|
||
|
func (entry DirEntry) IsRegular() bool {
|
||
|
return entry.Mode.IsRegular()
|
||
|
}
|
||
|
|
||
|
// sort interface for DirEntry slice
|
||
|
type byEntryName []DirEntry
|
||
|
|
||
|
func (f byEntryName) Len() int { return len(f) }
|
||
|
func (f byEntryName) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
|
||
|
func (f byEntryName) Less(i, j int) bool { return f[i].Name < f[j].Name }
|
||
|
|
||
|
func filteredReaddir(dirname string, filter func(DirEntry) bool, appendPath bool) ([]DirEntry, error) {
|
||
|
result := []DirEntry{}
|
||
|
|
||
|
d, err := os.Open(dirname)
|
||
|
if err != nil {
|
||
|
return result, err
|
||
|
}
|
||
|
|
||
|
defer d.Close()
|
||
|
|
||
|
for {
|
||
|
fis, err := d.Readdir(1000)
|
||
|
if err != nil {
|
||
|
if err == io.EOF {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
return result, err
|
||
|
}
|
||
|
|
||
|
for _, fi := range fis {
|
||
|
name := fi.Name()
|
||
|
if appendPath {
|
||
|
name = filepath.Join(dirname, name)
|
||
|
}
|
||
|
|
||
|
if fi.IsDir() {
|
||
|
name += string(os.PathSeparator)
|
||
|
}
|
||
|
|
||
|
entry := DirEntry{Name: name, Size: fi.Size(), Mode: fi.Mode(), ModTime: fi.ModTime()}
|
||
|
|
||
|
if filter == nil || filter(entry) {
|
||
|
result = append(result, entry)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
sort.Sort(byEntryName(result))
|
||
|
|
||
|
return result, nil
|
||
|
}
|
||
|
|
||
|
func filteredReaddirnames(dirname string, filter func(string) bool) ([]string, error) {
|
||
|
result := []string{}
|
||
|
d, err := os.Open(dirname)
|
||
|
if err != nil {
|
||
|
return result, err
|
||
|
}
|
||
|
|
||
|
defer d.Close()
|
||
|
|
||
|
for {
|
||
|
names, err := d.Readdirnames(1000)
|
||
|
if err != nil {
|
||
|
if err == io.EOF {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
return result, err
|
||
|
}
|
||
|
|
||
|
for _, name := range names {
|
||
|
if filter == nil || filter(name) {
|
||
|
result = append(result, name)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
sort.Strings(result)
|
||
|
|
||
|
return result, nil
|
||
|
}
|
||
|
|
||
|
func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, recursive bool) multipartObjectInfoChannel {
|
||
|
objectInfoCh := make(chan multipartObjectInfo, listObjectsLimit)
|
||
|
timeoutCh := make(chan struct{}, 1)
|
||
|
|
||
|
// TODO: check if bucketDir is absolute path
|
||
|
scanDir := bucketDir
|
||
|
dirDepth := bucketDir
|
||
|
|
||
|
if prefixPath != "" {
|
||
|
if !filepath.IsAbs(prefixPath) {
|
||
|
tmpPrefixPath := filepath.Join(bucketDir, prefixPath)
|
||
|
if strings.HasSuffix(prefixPath, string(os.PathSeparator)) {
|
||
|
tmpPrefixPath += string(os.PathSeparator)
|
||
|
}
|
||
|
prefixPath = tmpPrefixPath
|
||
|
}
|
||
|
|
||
|
// TODO: check if prefixPath starts with bucketDir
|
||
|
|
||
|
// Case #1: if prefixPath is /mnt/mys3/mybucket/2012/photos/paris, then
|
||
|
// dirDepth is /mnt/mys3/mybucket/2012/photos
|
||
|
// Case #2: if prefixPath is /mnt/mys3/mybucket/2012/photos/, then
|
||
|
// dirDepth is /mnt/mys3/mybucket/2012/photos
|
||
|
dirDepth = filepath.Dir(prefixPath)
|
||
|
scanDir = dirDepth
|
||
|
} else {
|
||
|
prefixPath = bucketDir
|
||
|
}
|
||
|
|
||
|
if markerPath != "" {
|
||
|
if !filepath.IsAbs(markerPath) {
|
||
|
tmpMarkerPath := filepath.Join(bucketDir, markerPath)
|
||
|
if strings.HasSuffix(markerPath, string(os.PathSeparator)) {
|
||
|
tmpMarkerPath += string(os.PathSeparator)
|
||
|
}
|
||
|
|
||
|
markerPath = tmpMarkerPath
|
||
|
}
|
||
|
|
||
|
// TODO: check markerPath must be a file
|
||
|
if uploadIDMarker != "" {
|
||
|
markerPath = filepath.Join(markerPath, uploadIDMarker+uploadIDSuffix)
|
||
|
}
|
||
|
|
||
|
// TODO: check if markerPath starts with bucketDir
|
||
|
// TODO: check if markerPath starts with prefixPath
|
||
|
|
||
|
// Case #1: if markerPath is /mnt/mys3/mybucket/2012/photos/gophercon.png, then
|
||
|
// scanDir is /mnt/mys3/mybucket/2012/photos
|
||
|
// Case #2: if markerPath is /mnt/mys3/mybucket/2012/photos/gophercon.png/1fbd117a-268a-4ed0-85c9-8cc3888cbf20.uploadid, then
|
||
|
// scanDir is /mnt/mys3/mybucket/2012/photos/gophercon.png
|
||
|
// Case #3: if markerPath is /mnt/mys3/mybucket/2012/photos/, then
|
||
|
// scanDir is /mnt/mys3/mybucket/2012/photos
|
||
|
|
||
|
scanDir = filepath.Dir(markerPath)
|
||
|
} else {
|
||
|
markerPath = bucketDir
|
||
|
}
|
||
|
|
||
|
// Have bucketDir ends with os.PathSeparator
|
||
|
if !strings.HasSuffix(bucketDir, string(os.PathSeparator)) {
|
||
|
bucketDir += string(os.PathSeparator)
|
||
|
}
|
||
|
|
||
|
// Remove os.PathSeparator if scanDir ends with
|
||
|
if strings.HasSuffix(scanDir, string(os.PathSeparator)) {
|
||
|
scanDir = filepath.Dir(scanDir)
|
||
|
}
|
||
|
|
||
|
// goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel.
|
||
|
go func() {
|
||
|
defer close(objectInfoCh)
|
||
|
defer close(timeoutCh)
|
||
|
|
||
|
// send function - returns true if ObjectInfo is sent
|
||
|
// within (time.Second * 15) else false on timeout.
|
||
|
send := func(oi multipartObjectInfo) bool {
|
||
|
timer := time.After(time.Second * 15)
|
||
|
select {
|
||
|
case objectInfoCh <- oi:
|
||
|
return true
|
||
|
case <-timer:
|
||
|
timeoutCh <- struct{}{}
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for {
|
||
|
entries, err := filteredReaddir(scanDir,
|
||
|
func(entry DirEntry) bool {
|
||
|
if entry.IsDir() || (entry.IsRegular() && strings.HasSuffix(entry.Name, uploadIDSuffix)) {
|
||
|
return strings.HasPrefix(entry.Name, prefixPath) && entry.Name > markerPath
|
||
|
}
|
||
|
|
||
|
return false
|
||
|
},
|
||
|
true)
|
||
|
if err != nil {
|
||
|
send(multipartObjectInfo{Err: err})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
var entry DirEntry
|
||
|
for len(entries) > 0 {
|
||
|
entry, entries = entries[0], entries[1:]
|
||
|
|
||
|
if entry.IsRegular() {
|
||
|
// Handle uploadid file
|
||
|
name := strings.Replace(filepath.Dir(entry.Name), bucketDir, "", 1)
|
||
|
if name == "" {
|
||
|
// This should not happen ie uploadid file should not be in bucket directory
|
||
|
send(multipartObjectInfo{Err: errors.New("corrupted meta data")})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
uploadID := strings.Split(filepath.Base(entry.Name), uploadIDSuffix)[0]
|
||
|
|
||
|
objInfo := multipartObjectInfo{
|
||
|
Name: name,
|
||
|
UploadID: uploadID,
|
||
|
ModifiedTime: entry.ModTime,
|
||
|
}
|
||
|
|
||
|
if !send(objInfo) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
subentries, err := filteredReaddir(entry.Name,
|
||
|
func(entry DirEntry) bool {
|
||
|
return entry.IsDir() || (entry.IsRegular() && strings.HasSuffix(entry.Name, uploadIDSuffix))
|
||
|
},
|
||
|
true)
|
||
|
if err != nil {
|
||
|
send(multipartObjectInfo{Err: err})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
subDirFound := false
|
||
|
uploadIDEntries := []DirEntry{}
|
||
|
// If subentries has a directory, then current entry needs to be sent
|
||
|
for _, subentry := range subentries {
|
||
|
if subentry.IsDir() {
|
||
|
subDirFound = true
|
||
|
|
||
|
if recursive {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if !recursive && subentry.IsRegular() {
|
||
|
uploadIDEntries = append(uploadIDEntries, subentry)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if subDirFound || len(subentries) == 0 {
|
||
|
objInfo := multipartObjectInfo{
|
||
|
Name: strings.Replace(entry.Name, bucketDir, "", 1),
|
||
|
ModifiedTime: entry.ModTime,
|
||
|
IsDir: true,
|
||
|
}
|
||
|
|
||
|
if !send(objInfo) {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if recursive {
|
||
|
entries = append(subentries, entries...)
|
||
|
} else {
|
||
|
entries = append(uploadIDEntries, entries...)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if !recursive {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
markerPath = scanDir + string(os.PathSeparator)
|
||
|
|
||
|
if scanDir = filepath.Dir(scanDir); scanDir < dirDepth {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return multipartObjectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh}
|
||
|
}
|
||
|
|
||
|
// multipartObjectInfo - Multipart object info
|
||
|
type multipartObjectInfo struct {
|
||
|
Name string
|
||
|
UploadID string
|
||
|
ModifiedTime time.Time
|
||
|
IsDir bool
|
||
|
Err error
|
||
|
}
|
||
|
|
||
|
// multipartObjectInfoChannel - multipart object info channel
|
||
|
type multipartObjectInfoChannel struct {
|
||
|
ch <-chan multipartObjectInfo
|
||
|
objInfo *multipartObjectInfo
|
||
|
closed bool
|
||
|
timeoutCh <-chan struct{}
|
||
|
timedOut bool
|
||
|
}
|
||
|
|
||
|
func (oic *multipartObjectInfoChannel) Read() (multipartObjectInfo, bool) {
|
||
|
if oic.closed {
|
||
|
return multipartObjectInfo{}, false
|
||
|
}
|
||
|
|
||
|
if oic.objInfo == nil {
|
||
|
// First read.
|
||
|
if oi, ok := <-oic.ch; ok {
|
||
|
oic.objInfo = &oi
|
||
|
} else {
|
||
|
oic.closed = true
|
||
|
return multipartObjectInfo{}, false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
retObjInfo := *oic.objInfo
|
||
|
status := true
|
||
|
oic.objInfo = nil
|
||
|
|
||
|
// Read once more to know whether it was last read.
|
||
|
if oi, ok := <-oic.ch; ok {
|
||
|
oic.objInfo = &oi
|
||
|
} else {
|
||
|
oic.closed = true
|
||
|
}
|
||
|
|
||
|
return retObjInfo, status
|
||
|
}
|
||
|
|
||
|
// IsClosed - return whether channel is closed or not.
|
||
|
func (oic multipartObjectInfoChannel) IsClosed() bool {
|
||
|
if oic.objInfo != nil {
|
||
|
return false
|
||
|
}
|
||
|
return oic.closed
|
||
|
}
|
||
|
|
||
|
// IsTimedOut - return whether channel is closed due to timeout.
|
||
|
func (oic multipartObjectInfoChannel) IsTimedOut() bool {
|
||
|
if oic.timedOut {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case _, ok := <-oic.timeoutCh:
|
||
|
if ok {
|
||
|
oic.timedOut = true
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
default:
|
||
|
return false
|
||
|
}
|
||
|
}
|