|
|
|
@ -17,7 +17,6 @@ |
|
|
|
|
package fs |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"bytes" |
|
|
|
|
"crypto/md5" |
|
|
|
|
"encoding/base64" |
|
|
|
|
"encoding/hex" |
|
|
|
@ -116,28 +115,23 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa |
|
|
|
|
|
|
|
|
|
func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error { |
|
|
|
|
for _, part := range parts.Part { |
|
|
|
|
recvMD5 := part.ETag |
|
|
|
|
partFile, err := os.OpenFile(objectPath+fmt.Sprintf("$%d", part.PartNumber), os.O_RDONLY, 0600) |
|
|
|
|
partFile, e := os.OpenFile(objectPath+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) |
|
|
|
|
defer partFile.Close() |
|
|
|
|
if err != nil { |
|
|
|
|
return probe.NewError(err) |
|
|
|
|
if e != nil { |
|
|
|
|
return probe.NewError(e) |
|
|
|
|
} |
|
|
|
|
obj, err := ioutil.ReadAll(partFile) |
|
|
|
|
if err != nil { |
|
|
|
|
return probe.NewError(err) |
|
|
|
|
} |
|
|
|
|
calcMD5Bytes := md5.Sum(obj) |
|
|
|
|
|
|
|
|
|
recvMD5 := part.ETag |
|
|
|
|
// complete multipart request header md5sum per part is hex encoded
|
|
|
|
|
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) |
|
|
|
|
if err != nil { |
|
|
|
|
// trim it and decode if possible.
|
|
|
|
|
_, e = hex.DecodeString(strings.Trim(recvMD5, "\"")) |
|
|
|
|
if e != nil { |
|
|
|
|
return probe.NewError(InvalidDigest{Md5: recvMD5}) |
|
|
|
|
} |
|
|
|
|
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { |
|
|
|
|
return probe.NewError(BadDigest{Md5: recvMD5}) |
|
|
|
|
} |
|
|
|
|
_, err = io.Copy(mw, bytes.NewBuffer(obj)) |
|
|
|
|
if err != nil { |
|
|
|
|
return probe.NewError(err) |
|
|
|
|
|
|
|
|
|
_, e = io.Copy(mw, partFile) |
|
|
|
|
if e != nil { |
|
|
|
|
return probe.NewError(e) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
@ -275,8 +269,8 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
objectPath := filepath.Join(bucketPath, object) |
|
|
|
|
partPath := objectPath + fmt.Sprintf("$%d", partID) |
|
|
|
|
partFile, err := atomic.FileCreateWithPrefix(partPath, "") |
|
|
|
|
partPath := objectPath + fmt.Sprintf("$%d-$multiparts", partID) |
|
|
|
|
partFile, err := atomic.FileCreateWithPrefix(partPath, "$multiparts") |
|
|
|
|
if err != nil { |
|
|
|
|
return "", probe.NewError(err) |
|
|
|
|
} |
|
|
|
@ -415,7 +409,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da |
|
|
|
|
|
|
|
|
|
delete(fs.multiparts.ActiveSession, object) |
|
|
|
|
for _, part := range parts.Part { |
|
|
|
|
err = os.Remove(objectPath + fmt.Sprintf("$%d", part.PartNumber)) |
|
|
|
|
err = os.Remove(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber)) |
|
|
|
|
if err != nil { |
|
|
|
|
file.CloseAndPurge() |
|
|
|
|
return ObjectMetadata{}, probe.NewError(err) |
|
|
|
@ -546,7 +540,7 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob |
|
|
|
|
|
|
|
|
|
objectPath := filepath.Join(bucketPath, object) |
|
|
|
|
for _, part := range fs.multiparts.ActiveSession[object].Parts { |
|
|
|
|
err = os.RemoveAll(objectPath + fmt.Sprintf("$%d", part.PartNumber)) |
|
|
|
|
err = os.RemoveAll(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber)) |
|
|
|
|
if err != nil { |
|
|
|
|
return probe.NewError(err) |
|
|
|
|
} |
|
|
|
|