Merge pull request #1184 from harshavardhana/multipart

multipart: remove proper MD5, rather create MD5 based on parts to be s3 compatible.
master
Harshavardhana 9 years ago
commit 356b889e66
  1. 5
      api-response.go
  2. 2
      bucket-handlers.go
  3. 5
      object-handlers.go
  4. 5
      pkg/fs/api_suite_nix_test.go
  5. 5
      pkg/fs/api_suite_windows_test.go
  6. 64
      pkg/fs/fs-multipart.go

@ -218,6 +218,11 @@ type CompleteMultipartUploadResponse struct {
ETag string ETag string
} }
// getLocation get URL location.
func getLocation(r *http.Request) string {
return r.URL.Path
}
// takes an array of Bucketmetadata information for serialization // takes an array of Bucketmetadata information for serialization
// input: // input:
// array of bucket metadata // array of bucket metadata

@ -291,7 +291,7 @@ func (api storageAPI) PutBucketHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
// Make sure to add Location information here only for bucket // Make sure to add Location information here only for bucket
w.Header().Set("Location", "/"+bucket) w.Header().Set("Location", getLocation(r))
writeSuccessResponse(w, nil) writeSuccessResponse(w, nil)
} }

@ -798,7 +798,10 @@ func (api storageAPI) CompleteMultipartUploadHandler(w http.ResponseWriter, r *h
} }
return return
} }
response := generateCompleteMultpartUploadResponse(bucket, object, r.URL.String(), metadata.MD5) // get object location.
location := getLocation(r)
// Generate complete multipart response.
response := generateCompleteMultpartUploadResponse(bucket, object, location, metadata.MD5)
encodedSuccessResponse := encodeSuccessResponse(response) encodedSuccessResponse := encodeSuccessResponse(response)
// write headers // write headers
setCommonHeaders(w) setCommonHeaders(w)

@ -66,7 +66,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
completedParts := CompleteMultipartUpload{} completedParts := CompleteMultipartUpload{}
completedParts.Part = make([]CompletePart, 0) completedParts.Part = make([]CompletePart, 0)
finalHasher := md5.New()
for i := 1; i <= 10; i++ { for i := 1; i <= 10; i++ {
randomPerm := rand.Perm(10) randomPerm := rand.Perm(10)
randomString := "" randomString := ""
@ -75,7 +74,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
} }
hasher := md5.New() hasher := md5.New()
finalHasher.Write([]byte(randomString))
hasher.Write([]byte(randomString)) hasher.Write([]byte(randomString))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil)) expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
@ -87,12 +85,11 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex) c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex)
completedParts.Part = append(completedParts.Part, CompletePart{PartNumber: i, ETag: calculatedmd5sum}) completedParts.Part = append(completedParts.Part, CompletePart{PartNumber: i, ETag: calculatedmd5sum})
} }
finalExpectedmd5SumHex := hex.EncodeToString(finalHasher.Sum(nil))
completedPartsBytes, e := xml.Marshal(completedParts) completedPartsBytes, e := xml.Marshal(completedParts)
c.Assert(e, check.IsNil) c.Assert(e, check.IsNil)
objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil) objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objectMetadata.MD5, check.Equals, finalExpectedmd5SumHex) c.Assert(objectMetadata.MD5, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10")
} }
func testMultipartObjectAbort(c *check.C, create func() Filesystem) { func testMultipartObjectAbort(c *check.C, create func() Filesystem) {

@ -65,7 +65,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
completedParts := CompleteMultipartUpload{} completedParts := CompleteMultipartUpload{}
completedParts.Part = make([]CompletePart, 0) completedParts.Part = make([]CompletePart, 0)
finalHasher := md5.New()
for i := 1; i <= 10; i++ { for i := 1; i <= 10; i++ {
randomPerm := rand.Perm(10) randomPerm := rand.Perm(10)
randomString := "" randomString := ""
@ -74,7 +73,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
} }
hasher := md5.New() hasher := md5.New()
finalHasher.Write([]byte(randomString))
hasher.Write([]byte(randomString)) hasher.Write([]byte(randomString))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil)) expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
@ -86,12 +84,11 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) {
c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex) c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex)
completedParts.Part = append(completedParts.Part, CompletePart{PartNumber: i, ETag: calculatedmd5sum}) completedParts.Part = append(completedParts.Part, CompletePart{PartNumber: i, ETag: calculatedmd5sum})
} }
finalExpectedmd5SumHex := hex.EncodeToString(finalHasher.Sum(nil))
completedPartsBytes, e := xml.Marshal(completedParts) completedPartsBytes, e := xml.Marshal(completedParts)
c.Assert(e, check.IsNil) c.Assert(e, check.IsNil)
objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil) objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objectMetadata.MD5, check.Equals, finalExpectedmd5SumHex) c.Assert(objectMetadata.MD5, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10")
} }
func testMultipartObjectAbort(c *check.C, create func() Filesystem) { func testMultipartObjectAbort(c *check.C, create func() Filesystem) {

@ -141,6 +141,22 @@ func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool {
return true return true
} }
// Create an s3 compatible MD5sum for complete multipart transaction.
func makeS3MD5(md5Strs ...string) (string, *probe.Error) {
var finalMD5Bytes []byte
for _, md5Str := range md5Strs {
md5Bytes, e := hex.DecodeString(md5Str)
if e != nil {
return "", probe.NewError(e)
}
finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
}
md5Hasher := md5.New()
md5Hasher.Write(finalMD5Bytes)
s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs))
return s3MD5, nil
}
type multiCloser struct { type multiCloser struct {
Closers []io.Closer Closers []io.Closer
} }
@ -177,7 +193,9 @@ func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe
var partReaders []io.Reader var partReaders []io.Reader
var partClosers []io.Closer var partClosers []io.Closer
for _, part := range parts { for _, part := range parts {
// Trim prefix
md5Sum := strings.TrimPrefix(part.ETag, "\"") md5Sum := strings.TrimPrefix(part.ETag, "\"")
// Trim suffix
md5Sum = strings.TrimSuffix(md5Sum, "\"") md5Sum = strings.TrimSuffix(md5Sum, "\"")
partFile, e := os.OpenFile(partPathPrefix+md5Sum+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) partFile, e := os.OpenFile(partPathPrefix+md5Sum+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
if e != nil { if e != nil {
@ -199,7 +217,8 @@ func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe
defer closer.Close() defer closer.Close()
reader := io.MultiReader(partReaders...) reader := io.MultiReader(partReaders...)
readBuffer := make([]byte, 4*1024*1024) readBufferSize := 8 * 1024 * 1024 // 8MiB
readBuffer := make([]byte, readBufferSize) // Allocate 8MiB buffer.
if _, e := io.CopyBuffer(mw, reader, readBuffer); e != nil { if _, e := io.CopyBuffer(mw, reader, readBuffer); e != nil {
return probe.NewError(e) return probe.NewError(e)
} }
@ -465,16 +484,14 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
} }
objectPath := filepath.Join(bucketPath, object) objectPath := filepath.Join(bucketPath, object)
file, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject") objectWriter, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject")
if e != nil { if e != nil {
return ObjectMetadata{}, probe.NewError(e) return ObjectMetadata{}, probe.NewError(e)
} }
md5Hasher := md5.New()
objectWriter := io.MultiWriter(file, md5Hasher)
partBytes, e := ioutil.ReadAll(data) partBytes, e := ioutil.ReadAll(data)
if e != nil { if e != nil {
file.CloseAndPurge() objectWriter.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(e) return ObjectMetadata{}, probe.NewError(e)
} }
if signature != nil { if signature != nil {
@ -482,21 +499,21 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
sh.Write(partBytes) sh.Write(partBytes)
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil))) ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil)))
if err != nil { if err != nil {
file.CloseAndPurge() objectWriter.CloseAndPurge()
return ObjectMetadata{}, err.Trace() return ObjectMetadata{}, err.Trace()
} }
if !ok { if !ok {
file.CloseAndPurge() objectWriter.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(SignDoesNotMatch{}) return ObjectMetadata{}, probe.NewError(SignDoesNotMatch{})
} }
} }
completeMultipartUpload := &CompleteMultipartUpload{} completeMultipartUpload := &CompleteMultipartUpload{}
if e = xml.Unmarshal(partBytes, completeMultipartUpload); e != nil { if e = xml.Unmarshal(partBytes, completeMultipartUpload); e != nil {
file.CloseAndPurge() objectWriter.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(MalformedXML{}) return ObjectMetadata{}, probe.NewError(MalformedXML{})
} }
if !sort.IsSorted(completedParts(completeMultipartUpload.Part)) { if !sort.IsSorted(completedParts(completeMultipartUpload.Part)) {
file.CloseAndPurge() objectWriter.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(InvalidPartOrder{}) return ObjectMetadata{}, probe.NewError(InvalidPartOrder{})
} }
@ -509,28 +526,42 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
fs.rwLock.RUnlock() fs.rwLock.RUnlock()
if !doPartsMatch(parts, savedParts) { if !doPartsMatch(parts, savedParts) {
file.CloseAndPurge() objectWriter.CloseAndPurge()
return ObjectMetadata{}, probe.NewError(InvalidPart{}) return ObjectMetadata{}, probe.NewError(InvalidPart{})
} }
// Parts successfully validated, save all the parts. // Parts successfully validated, save all the parts.
partPathPrefix := objectPath + uploadID partPathPrefix := objectPath + uploadID
if err := saveParts(partPathPrefix, objectWriter, parts); err != nil { if err := saveParts(partPathPrefix, objectWriter, parts); err != nil {
file.CloseAndPurge() objectWriter.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix) return ObjectMetadata{}, err.Trace(partPathPrefix)
} }
// Successfully saved, remove all parts. var md5Strs []string
removeParts(partPathPrefix, savedParts) for _, part := range savedParts {
md5Strs = append(md5Strs, part.ETag)
}
// Save the s3 md5.
s3MD5, err := makeS3MD5(md5Strs...)
if err != nil {
objectWriter.CloseAndPurge()
return ObjectMetadata{}, err.Trace(md5Strs...)
}
// Successfully saved multipart, remove all parts in a routine.
go removeParts(partPathPrefix, savedParts)
// Critical region requiring write lock. // Critical region requiring write lock.
fs.rwLock.Lock() fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, uploadID) delete(fs.multiparts.ActiveSession, uploadID)
if err := saveMultipartsSession(*fs.multiparts); err != nil { if err := saveMultipartsSession(*fs.multiparts); err != nil {
fs.rwLock.Unlock() fs.rwLock.Unlock()
file.CloseAndPurge() objectWriter.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix) return ObjectMetadata{}, err.Trace(partPathPrefix)
} }
file.Close() if e = objectWriter.Close(); e != nil {
fs.rwLock.Unlock()
return ObjectMetadata{}, probe.NewError(e)
}
fs.rwLock.Unlock() fs.rwLock.Unlock()
// Send stat again to get object metadata. // Send stat again to get object metadata.
@ -538,6 +569,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
if e != nil { if e != nil {
return ObjectMetadata{}, probe.NewError(e) return ObjectMetadata{}, probe.NewError(e)
} }
contentType := "application/octet-stream" contentType := "application/octet-stream"
if objectExt := filepath.Ext(objectPath); objectExt != "" { if objectExt := filepath.Ext(objectPath); objectExt != "" {
content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))] content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]
@ -551,7 +583,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
LastModified: st.ModTime(), LastModified: st.ModTime(),
Size: st.Size(), Size: st.Size(),
ContentType: contentType, ContentType: contentType,
MD5: hex.EncodeToString(md5Hasher.Sum(nil)), MD5: s3MD5,
} }
return newObject, nil return newObject, nil
} }

Loading…
Cancel
Save