|
|
|
@ -57,7 +57,14 @@ func removeFileTree(fileName string, level string) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error { |
|
|
|
|
func safeRemoveFile(file *os.File) error { |
|
|
|
|
if e := file.Close(); e != nil { |
|
|
|
|
return e |
|
|
|
|
} |
|
|
|
|
return os.Remove(file.Name()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func safeWriteFile(fileName string, data io.Reader, size int64, md5sum string) error { |
|
|
|
|
tempFile, e := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-") |
|
|
|
|
if e != nil { |
|
|
|
|
return e |
|
|
|
@ -66,8 +73,7 @@ func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error |
|
|
|
|
md5Hasher := md5.New() |
|
|
|
|
multiWriter := io.MultiWriter(md5Hasher, tempFile) |
|
|
|
|
if _, e := io.CopyN(multiWriter, data, size); e != nil { |
|
|
|
|
tempFile.Close() |
|
|
|
|
os.Remove(tempFile.Name()) |
|
|
|
|
safeRemoveFile(tempFile) |
|
|
|
|
return e |
|
|
|
|
} |
|
|
|
|
tempFile.Close() |
|
|
|
@ -284,8 +290,8 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i |
|
|
|
|
return "", probe.NewError(e) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
partFile := filepath.Join(fs.path, configDir, bucket, object, uploadID+"."+strconv.Itoa(partNumber)+"."+md5Hex) |
|
|
|
|
if e := safeWrite(partFile, data, size, md5Hex); e != nil { |
|
|
|
|
partFile := filepath.Join(fs.path, configDir, bucket, object, fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5Hex)) |
|
|
|
|
if e := safeWriteFile(partFile, data, size, md5Hex); e != nil { |
|
|
|
|
return "", probe.NewError(e) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -356,34 +362,30 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa |
|
|
|
|
|
|
|
|
|
tempFile, e := ioutil.TempFile(metaObjectDir, uploadID+".complete.") |
|
|
|
|
if e != nil { |
|
|
|
|
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
|
|
|
|
return ObjectInfo{}, probe.NewError(e) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, part := range parts { |
|
|
|
|
partNumber := part.PartNumber |
|
|
|
|
md5sum := strings.Trim(part.ETag, "\"") |
|
|
|
|
partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum) |
|
|
|
|
var f *os.File |
|
|
|
|
f, e = os.Open(partFile) |
|
|
|
|
// Trim off the odd double quotes from ETag in the beginning and end.
|
|
|
|
|
md5sum := strings.TrimPrefix(part.ETag, "\"") |
|
|
|
|
md5sum = strings.TrimSuffix(md5sum, "\"") |
|
|
|
|
partFileStr := filepath.Join(metaObjectDir, fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5sum)) |
|
|
|
|
var partFile *os.File |
|
|
|
|
partFile, e = os.Open(partFileStr) |
|
|
|
|
if e != nil { |
|
|
|
|
tempFile.Close() |
|
|
|
|
os.Remove(tempFile.Name()) |
|
|
|
|
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
|
|
|
|
safeRemoveFile(tempFile) |
|
|
|
|
return ObjectInfo{}, probe.NewError(e) |
|
|
|
|
} else if _, e = io.Copy(tempFile, f); e != nil { |
|
|
|
|
tempFile.Close() |
|
|
|
|
os.Remove(tempFile.Name()) |
|
|
|
|
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
|
|
|
|
} else if _, e = io.Copy(tempFile, partFile); e != nil { |
|
|
|
|
safeRemoveFile(tempFile) |
|
|
|
|
return ObjectInfo{}, probe.NewError(e) |
|
|
|
|
} |
|
|
|
|
f.Close() |
|
|
|
|
partFile.Close() // Close part file after successful copy.
|
|
|
|
|
} |
|
|
|
|
tempFile.Close() |
|
|
|
|
// fi is used later
|
|
|
|
|
fi, e := os.Stat(tempFile.Name()) |
|
|
|
|
|
|
|
|
|
// Stat to gather fresh stat info.
|
|
|
|
|
objSt, e := os.Stat(tempFile.Name()) |
|
|
|
|
if e != nil { |
|
|
|
|
os.Remove(tempFile.Name()) |
|
|
|
|
return ObjectInfo{}, probe.NewError(e) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -391,7 +393,6 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa |
|
|
|
|
objectPath := filepath.Join(bucketPath, object) |
|
|
|
|
if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil { |
|
|
|
|
os.Remove(tempFile.Name()) |
|
|
|
|
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
|
|
|
|
return ObjectInfo{}, probe.NewError(e) |
|
|
|
|
} |
|
|
|
|
if e = os.Rename(tempFile.Name(), objectPath); e != nil { |
|
|
|
@ -411,8 +412,8 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa |
|
|
|
|
newObject := ObjectInfo{ |
|
|
|
|
Bucket: bucket, |
|
|
|
|
Name: object, |
|
|
|
|
ModifiedTime: fi.ModTime(), |
|
|
|
|
Size: fi.Size(), |
|
|
|
|
ModifiedTime: objSt.ModTime(), |
|
|
|
|
Size: objSt.Size(), |
|
|
|
|
ContentType: contentType, |
|
|
|
|
MD5Sum: s3MD5, |
|
|
|
|
} |
|
|
|
@ -482,18 +483,14 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
markerPath := filepath.FromSlash(keyMarker) |
|
|
|
|
|
|
|
|
|
if uploadIDMarker != "" { |
|
|
|
|
if strings.HasSuffix(markerPath, string(os.PathSeparator)) { |
|
|
|
|
return result, probe.NewError(fmt.Errorf("Invalid combination of uploadID marker '%s' and marker '%s'", uploadIDMarker, keyMarker)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
id, err := uuid.Parse(uploadIDMarker) |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
return result, probe.NewError(err) |
|
|
|
|
id, e := uuid.Parse(uploadIDMarker) |
|
|
|
|
if e != nil { |
|
|
|
|
return result, probe.NewError(e) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if id.IsZero() { |
|
|
|
|
return result, probe.NewError(fmt.Errorf("Invalid upload ID marker %s", uploadIDMarker)) |
|
|
|
|
} |
|
|
|
@ -516,9 +513,16 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa |
|
|
|
|
recursive = false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bucketDir := filepath.Join(fs.path, bucket) |
|
|
|
|
// If listMultipartObjectChannel is available for given parameters, then use it, else create new one
|
|
|
|
|
multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) |
|
|
|
|
bucketDir := filepath.Join(fs.path, configDir, bucket) |
|
|
|
|
// Lookup of if listMultipartObjectChannel is available for given
|
|
|
|
|
// parameters, else create a new one.
|
|
|
|
|
multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{ |
|
|
|
|
bucket: bucket, |
|
|
|
|
delimiter: delimiter, |
|
|
|
|
keyMarker: markerPath, |
|
|
|
|
prefix: prefixPath, |
|
|
|
|
uploadIDMarker: uploadIDMarker, |
|
|
|
|
}) |
|
|
|
|
if multipartObjectInfoCh == nil { |
|
|
|
|
ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) |
|
|
|
|
multipartObjectInfoCh = &ch |
|
|
|
@ -534,10 +538,14 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if multipartObjInfo.Err != nil { |
|
|
|
|
if os.IsNotExist(multipartObjInfo.Err) { |
|
|
|
|
return ListMultipartsInfo{}, nil |
|
|
|
|
} |
|
|
|
|
return ListMultipartsInfo{}, probe.NewError(multipartObjInfo.Err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if strings.Contains(multipartObjInfo.Name, "$multiparts") || strings.Contains(multipartObjInfo.Name, "$tmpobject") { |
|
|
|
|
if strings.Contains(multipartObjInfo.Name, "$multiparts") || |
|
|
|
|
strings.Contains(multipartObjInfo.Name, "$tmpobject") { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -548,7 +556,11 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa |
|
|
|
|
if multipartObjInfo.IsDir { |
|
|
|
|
result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name) |
|
|
|
|
} else { |
|
|
|
|
result.Uploads = append(result.Uploads, uploadMetadata{Object: multipartObjInfo.Name, UploadID: multipartObjInfo.UploadID, Initiated: multipartObjInfo.ModifiedTime}) |
|
|
|
|
result.Uploads = append(result.Uploads, uploadMetadata{ |
|
|
|
|
Object: multipartObjInfo.Name, |
|
|
|
|
UploadID: multipartObjInfo.UploadID, |
|
|
|
|
Initiated: multipartObjInfo.ModifiedTime, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
nextKeyMarker = multipartObjInfo.Name |
|
|
|
|
nextUploadIDMarker = multipartObjInfo.UploadID |
|
|
|
|