fs: Filter out $multiparts properly.

Relax md5 requirement during complete multipart upload - ref #977
master
Harshavardhana 9 years ago
parent 2392252a7c
commit a328120e4d
  1. 6
      pkg/fs/fs-bucket-listobjects.go
  2. 40
      pkg/fs/fs-multipart.go

@ -69,6 +69,9 @@ func (fs Filesystem) ListObjects(bucket string, resources BucketResourcesMetadat
return nil, resources, probe.NewError(err) return nil, resources, probe.NewError(err)
} }
for _, fl := range files { for _, fl := range files {
if strings.HasSuffix(fl.Name(), "$multiparts") {
continue
}
p.files = append(p.files, contentInfo{ p.files = append(p.files, contentInfo{
Prefix: fl.Name(), Prefix: fl.Name(),
Size: fl.Size(), Size: fl.Size(),
@ -105,6 +108,9 @@ func (fs Filesystem) ListObjects(bucket string, resources BucketResourcesMetadat
return nil, resources, probe.NewError(err) return nil, resources, probe.NewError(err)
} }
for _, fl := range files { for _, fl := range files {
if strings.HasSuffix(fl.Name(), "$multiparts") {
continue
}
prefix := fl.Name() prefix := fl.Name()
if resources.Prefix != "" { if resources.Prefix != "" {
prefix = filepath.Join(resources.Prefix, fl.Name()) prefix = filepath.Join(resources.Prefix, fl.Name())

@ -17,7 +17,6 @@
package fs package fs
import ( import (
"bytes"
"crypto/md5" "crypto/md5"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
@ -116,28 +115,23 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error { func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error {
for _, part := range parts.Part { for _, part := range parts.Part {
recvMD5 := part.ETag partFile, e := os.OpenFile(objectPath+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
partFile, err := os.OpenFile(objectPath+fmt.Sprintf("$%d", part.PartNumber), os.O_RDONLY, 0600)
defer partFile.Close() defer partFile.Close()
if err != nil { if e != nil {
return probe.NewError(err) return probe.NewError(e)
} }
obj, err := ioutil.ReadAll(partFile)
if err != nil { recvMD5 := part.ETag
return probe.NewError(err) // complete multipart request header md5sum per part is hex encoded
} // trim it and decode if possible.
calcMD5Bytes := md5.Sum(obj) _, e = hex.DecodeString(strings.Trim(recvMD5, "\""))
// complete multi part request header md5sum per part is hex encoded if e != nil {
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
if err != nil {
return probe.NewError(InvalidDigest{Md5: recvMD5}) return probe.NewError(InvalidDigest{Md5: recvMD5})
} }
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
return probe.NewError(BadDigest{Md5: recvMD5}) _, e = io.Copy(mw, partFile)
} if e != nil {
_, err = io.Copy(mw, bytes.NewBuffer(obj)) return probe.NewError(e)
if err != nil {
return probe.NewError(err)
} }
} }
return nil return nil
@ -275,8 +269,8 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
} }
objectPath := filepath.Join(bucketPath, object) objectPath := filepath.Join(bucketPath, object)
partPath := objectPath + fmt.Sprintf("$%d", partID) partPath := objectPath + fmt.Sprintf("$%d-$multiparts", partID)
partFile, err := atomic.FileCreateWithPrefix(partPath, "") partFile, err := atomic.FileCreateWithPrefix(partPath, "$multiparts")
if err != nil { if err != nil {
return "", probe.NewError(err) return "", probe.NewError(err)
} }
@ -415,7 +409,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
delete(fs.multiparts.ActiveSession, object) delete(fs.multiparts.ActiveSession, object)
for _, part := range parts.Part { for _, part := range parts.Part {
err = os.Remove(objectPath + fmt.Sprintf("$%d", part.PartNumber)) err = os.Remove(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
if err != nil { if err != nil {
file.CloseAndPurge() file.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(err) return ObjectMetadata{}, probe.NewError(err)
@ -546,7 +540,7 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
objectPath := filepath.Join(bucketPath, object) objectPath := filepath.Join(bucketPath, object)
for _, part := range fs.multiparts.ActiveSession[object].Parts { for _, part := range fs.multiparts.ActiveSession[object].Parts {
err = os.RemoveAll(objectPath + fmt.Sprintf("$%d", part.PartNumber)) err = os.RemoveAll(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
if err != nil { if err != nil {
return probe.NewError(err) return probe.NewError(err)
} }

Loading…
Cancel
Save