PutObject: object layer now returns ObjectInfo instead of md5sum to avoid extra GetObjectInfo call. (#2599)

From the S3 layer after PutObject we were calling GetObjectInfo for bucket notification. This can
be avoided if PutObjectInfo returns ObjectInfo.

fixes #2567
master
Krishna Srinivas 8 years ago committed by Harshavardhana
parent 92e49eab5a
commit b4e4846e9f
  1. 38
      cmd/benchmark-utils_test.go
  2. 16
      cmd/bucket-handlers.go
  3. 61
      cmd/fs-v1.go
  4. 14
      cmd/object-api-putobject_test.go
  5. 37
      cmd/object-handlers.go
  6. 2
      cmd/object-interface.go
  7. 6
      cmd/object_api_suite_test.go
  8. 40
      cmd/xl-v1-object.go

@ -54,9 +54,6 @@ func runPutObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) {
b.Fatal(err)
}
// PutObject returns md5Sum of the object inserted.
// md5Sum variable is assigned with that value.
var md5Sum string
// get text data generated for number of bytes equal to object size.
textData := generateBytesData(objSize)
// generate md5sum for the generated data.
@ -71,12 +68,12 @@ func runPutObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
// insert the object.
md5Sum, err = obj.PutObject(bucket, "object"+strconv.Itoa(i), int64(len(textData)), bytes.NewBuffer(textData), metadata)
objInfo, err := obj.PutObject(bucket, "object"+strconv.Itoa(i), int64(len(textData)), bytes.NewBuffer(textData), metadata)
if err != nil {
b.Fatal(err)
}
if md5Sum != metadata["md5Sum"] {
b.Fatalf("Write no: %d: Md5Sum mismatch during object write into the bucket: Expected %s, got %s", i+1, md5Sum, metadata["md5Sum"])
if objInfo.MD5Sum != metadata["md5Sum"] {
b.Fatalf("Write no: %d: Md5Sum mismatch during object write into the bucket: Expected %s, got %s", i+1, objInfo.MD5Sum, metadata["md5Sum"])
}
}
// Benchmark ends here. Stop timer.
@ -197,9 +194,6 @@ func runGetObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) {
b.Fatal(err)
}
// PutObject returns md5Sum of the object inserted.
// md5Sum variable is assigned with that value.
var md5Sum string
for i := 0; i < 10; i++ {
// get text data generated for number of bytes equal to object size.
textData := generateBytesData(objSize)
@ -211,12 +205,13 @@ func runGetObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) {
metadata := make(map[string]string)
metadata["md5Sum"] = hex.EncodeToString(hasher.Sum(nil))
// insert the object.
md5Sum, err = obj.PutObject(bucket, "object"+strconv.Itoa(i), int64(len(textData)), bytes.NewBuffer(textData), metadata)
var objInfo ObjectInfo
objInfo, err = obj.PutObject(bucket, "object"+strconv.Itoa(i), int64(len(textData)), bytes.NewBuffer(textData), metadata)
if err != nil {
b.Fatal(err)
}
if md5Sum != metadata["md5Sum"] {
b.Fatalf("Write no: %d: Md5Sum mismatch during object write into the bucket: Expected %s, got %s", i+1, md5Sum, metadata["md5Sum"])
if objInfo.MD5Sum != metadata["md5Sum"] {
b.Fatalf("Write no: %d: Md5Sum mismatch during object write into the bucket: Expected %s, got %s", i+1, objInfo.MD5Sum, metadata["md5Sum"])
}
}
@ -291,9 +286,6 @@ func runPutObjectBenchmarkParallel(b *testing.B, obj ObjectLayer, objSize int) {
b.Fatal(err)
}
// PutObject returns md5Sum of the object inserted.
// md5Sum variable is assigned with that value.
var md5Sum string
// get text data generated for number of bytes equal to object size.
textData := generateBytesData(objSize)
// generate md5sum for the generated data.
@ -311,12 +303,12 @@ func runPutObjectBenchmarkParallel(b *testing.B, obj ObjectLayer, objSize int) {
i := 0
for pb.Next() {
// insert the object.
md5Sum, err = obj.PutObject(bucket, "object"+strconv.Itoa(i), int64(len(textData)), bytes.NewBuffer(textData), metadata)
objInfo, err := obj.PutObject(bucket, "object"+strconv.Itoa(i), int64(len(textData)), bytes.NewBuffer(textData), metadata)
if err != nil {
b.Fatal(err)
}
if md5Sum != metadata["md5Sum"] {
b.Fatalf("Write no: Md5Sum mismatch during object write into the bucket: Expected %s, got %s", md5Sum, metadata["md5Sum"])
if objInfo.MD5Sum != metadata["md5Sum"] {
b.Fatalf("Write no: Md5Sum mismatch during object write into the bucket: Expected %s, got %s", objInfo.MD5Sum, metadata["md5Sum"])
}
i++
}
@ -338,9 +330,6 @@ func runGetObjectBenchmarkParallel(b *testing.B, obj ObjectLayer, objSize int) {
b.Fatal(err)
}
// PutObject returns md5Sum of the object inserted.
// md5Sum variable is assigned with that value.
var md5Sum string
for i := 0; i < 10; i++ {
// get text data generated for number of bytes equal to object size.
textData := generateBytesData(objSize)
@ -352,12 +341,13 @@ func runGetObjectBenchmarkParallel(b *testing.B, obj ObjectLayer, objSize int) {
metadata := make(map[string]string)
metadata["md5Sum"] = hex.EncodeToString(hasher.Sum(nil))
// insert the object.
md5Sum, err = obj.PutObject(bucket, "object"+strconv.Itoa(i), int64(len(textData)), bytes.NewBuffer(textData), metadata)
var objInfo ObjectInfo
objInfo, err = obj.PutObject(bucket, "object"+strconv.Itoa(i), int64(len(textData)), bytes.NewBuffer(textData), metadata)
if err != nil {
b.Fatal(err)
}
if md5Sum != metadata["md5Sum"] {
b.Fatalf("Write no: %d: Md5Sum mismatch during object write into the bucket: Expected %s, got %s", i+1, md5Sum, metadata["md5Sum"])
if objInfo.MD5Sum != metadata["md5Sum"] {
b.Fatalf("Write no: %d: Md5Sum mismatch during object write into the bucket: Expected %s, got %s", i+1, objInfo.MD5Sum, metadata["md5Sum"])
}
}

@ -21,7 +21,6 @@ import (
"io"
"net/http"
"net/url"
"path"
"strings"
"sync"
@ -427,17 +426,13 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
metadata := make(map[string]string)
// Nothing to store right now.
md5Sum, err := objectAPI.PutObject(bucket, object, -1, fileBody, metadata)
objInfo, err := objectAPI.PutObject(bucket, object, -1, fileBody, metadata)
if err != nil {
errorIf(err, "Unable to create object.")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return
}
if md5Sum != "" {
w.Header().Set("ETag", "\""+md5Sum+"\"")
}
// TODO full URL is preferred.
w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"")
w.Header().Set("Location", getObjectLocation(bucket, object))
// Set common headers.
@ -447,13 +442,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
writeSuccessNoContent(w)
if globalEventNotifier.IsBucketNotificationSet(bucket) {
// Fetch object info for notifications.
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
if err != nil {
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
return
}
// Notify object created event.
eventNotify(eventData{
Type: ObjectCreatedPost,

@ -311,16 +311,8 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
return toObjectErr(err, bucket, object)
}
// GetObjectInfo - get object info.
func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// getObjectInfo - get object info.
func (fs fsObjects) getObjectInfo(bucket, object string) (ObjectInfo, error) {
fi, err := fs.storage.StatFile(bucket, object)
if err != nil {
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
@ -358,14 +350,27 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
}, nil
}
// GetObjectInfo - get object info.
func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return fs.getObjectInfo(bucket, object)
}
// PutObject - create an object.
func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) {
func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", traceError(BucketNameInvalid{Bucket: bucket})
return ObjectInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", traceError(ObjectNameInvalid{
return ObjectInfo{}, traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
})
@ -397,9 +402,9 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
if size == 0 {
// For size 0 we write a 0byte file.
err := fs.storage.AppendFile(minioMetaBucket, tempObj, []byte(""))
err = fs.storage.AppendFile(minioMetaBucket, tempObj, []byte(""))
if err != nil {
return "", toObjectErr(traceError(err), bucket, object)
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
}
} else {
// Allocate a buffer to Read() from request body
@ -409,17 +414,18 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
}
buf := make([]byte, int(bufSize))
teeReader := io.TeeReader(limitDataReader, md5Writer)
bytesWritten, err := fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tempObj)
var bytesWritten int64
bytesWritten, err = fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tempObj)
if err != nil {
fs.storage.DeleteFile(minioMetaBucket, tempObj)
return "", toObjectErr(err, bucket, object)
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
}
// Should return IncompleteBody{} error when reader has fewer
// bytes than specified in request header.
if bytesWritten < size {
fs.storage.DeleteFile(minioMetaBucket, tempObj)
return "", traceError(IncompleteBody{})
return ObjectInfo{}, traceError(IncompleteBody{})
}
}
@ -435,7 +441,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
// Incoming payload wrong, delete the temporary object.
fs.storage.DeleteFile(minioMetaBucket, tempObj)
// Error return.
return "", toObjectErr(traceError(vErr), bucket, object)
return ObjectInfo{}, toObjectErr(traceError(vErr), bucket, object)
}
}
@ -446,14 +452,14 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
// MD5 mismatch, delete the temporary object.
fs.storage.DeleteFile(minioMetaBucket, tempObj)
// Returns md5 mismatch.
return "", traceError(BadDigest{md5Hex, newMD5Hex})
return ObjectInfo{}, traceError(BadDigest{md5Hex, newMD5Hex})
}
}
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
err := fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
if err != nil {
return "", toObjectErr(traceError(err), bucket, object)
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
}
// Save additional metadata only if extended headers such as "X-Amz-Meta-" are set.
@ -464,12 +470,15 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
fsMetaPath := path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, bucket, object)
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
}
}
// Return md5sum, successfully wrote object.
return newMD5Hex, nil
objInfo, err = fs.getObjectInfo(bucket, object)
if err == nil {
// If MINIO_ENABLE_FSMETA is not enabled objInfo.MD5Sum will be empty.
objInfo.MD5Sum = newMD5Hex
}
return objInfo, err
}
// DeleteObject - deletes an object from a bucket, this operation is destructive

@ -151,7 +151,7 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl
}
for i, testCase := range testCases {
actualMd5Hex, actualErr := obj.PutObject(testCase.bucketName, testCase.objName, testCase.intputDataSize, bytes.NewReader(testCase.inputData), testCase.inputMeta)
objInfo, actualErr := obj.PutObject(testCase.bucketName, testCase.objName, testCase.intputDataSize, bytes.NewReader(testCase.inputData), testCase.inputMeta)
actualErr = errorCause(actualErr)
if actualErr != nil && testCase.expectedError == nil {
t.Errorf("Test %d: %s: Expected to pass, but failed with: error %s.", i+1, instanceType, actualErr.Error())
@ -166,8 +166,8 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl
// Test passes as expected, but the output values are verified for correctness here.
if actualErr == nil {
// Asserting whether the md5 output is correct.
if expectedMD5, ok := testCase.inputMeta["md5Sum"]; ok && expectedMD5 != actualMd5Hex {
t.Errorf("Test %d: %s: Calculated Md5 different from the actual one %s.", i+1, instanceType, actualMd5Hex)
if expectedMD5, ok := testCase.inputMeta["md5Sum"]; ok && expectedMD5 != objInfo.MD5Sum {
t.Errorf("Test %d: %s: Calculated Md5 different from the actual one %s.", i+1, instanceType, objInfo.MD5Sum)
}
}
}
@ -224,7 +224,8 @@ func testObjectAPIPutObjectDiskNotFOund(obj ObjectLayer, instanceType string, di
}
for i, testCase := range testCases {
actualMd5Hex, actualErr := obj.PutObject(testCase.bucketName, testCase.objName, testCase.intputDataSize, bytes.NewReader(testCase.inputData), testCase.inputMeta)
objInfo, actualErr := obj.PutObject(testCase.bucketName, testCase.objName, testCase.intputDataSize, bytes.NewReader(testCase.inputData), testCase.inputMeta)
actualErr = errorCause(err)
if actualErr != nil && testCase.shouldPass {
t.Errorf("Test %d: %s: Expected to pass, but failed with: <ERROR> %s.", i+1, instanceType, actualErr.Error())
}
@ -241,8 +242,8 @@ func testObjectAPIPutObjectDiskNotFOund(obj ObjectLayer, instanceType string, di
// Test passes as expected, but the output values are verified for correctness here.
if actualErr == nil && testCase.shouldPass {
// Asserting whether the md5 output is correct.
if testCase.inputMeta["md5Sum"] != actualMd5Hex {
t.Errorf("Test %d: %s: Calculated Md5 different from the actual one %s.", i+1, instanceType, actualMd5Hex)
if testCase.inputMeta["md5Sum"] != objInfo.MD5Sum {
t.Errorf("Test %d: %s: Calculated Md5 different from the actual one %s.", i+1, instanceType, objInfo.MD5Sum)
}
}
}
@ -273,6 +274,7 @@ func testObjectAPIPutObjectDiskNotFOund(obj ObjectLayer, instanceType string, di
InsufficientWriteQuorum{},
}
_, actualErr := obj.PutObject(testCase.bucketName, testCase.objName, testCase.intputDataSize, bytes.NewReader(testCase.inputData), testCase.inputMeta)
actualErr = errorCause(actualErr)
if actualErr != nil && testCase.shouldPass {
t.Errorf("Test %d: %s: Expected to pass, but failed with: <ERROR> %s.", len(testCases)+1, instanceType, actualErr.Error())
}

@ -333,11 +333,14 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
return
}
// Size of object.
size := objInfo.Size
pipeReader, pipeWriter := io.Pipe()
go func() {
startOffset := int64(0) // Read the whole file.
// Get the object.
gErr := objectAPI.GetObject(sourceBucket, sourceObject, startOffset, objInfo.Size, pipeWriter)
gErr := objectAPI.GetObject(sourceBucket, sourceObject, startOffset, size, pipeWriter)
if gErr != nil {
errorIf(gErr, "Unable to read an object.")
pipeWriter.CloseWithError(gErr)
@ -346,9 +349,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
pipeWriter.Close() // Close.
}()
// Size of object.
size := objInfo.Size
// Save other metadata if available.
metadata := objInfo.UserDefined
@ -356,7 +356,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// same md5sum as the source.
// Create the object.
md5Sum, err := objectAPI.PutObject(bucket, object, size, pipeReader, metadata)
objInfo, err = objectAPI.PutObject(bucket, object, size, pipeReader, metadata)
if err != nil {
// Close the this end of the pipe upon error in PutObject.
pipeReader.CloseWithError(err)
@ -367,13 +367,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// Explicitly close the reader, before fetching object info.
pipeReader.Close()
objInfo, err = objectAPI.GetObjectInfo(bucket, object)
if err != nil {
errorIf(err, "Unable to fetch object info.")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return
}
md5Sum := objInfo.MD5Sum
response := generateCopyObjectResponse(md5Sum, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response)
// write headers
@ -448,7 +442,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
// Make sure we hex encode md5sum here.
metadata["md5Sum"] = hex.EncodeToString(md5Bytes)
var md5Sum string
var objInfo ObjectInfo
switch rAuthType {
default:
// For all unknown auth types return error.
@ -461,7 +455,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return
}
// Create anonymous object.
md5Sum, err = objectAPI.PutObject(bucket, object, size, r.Body, metadata)
objInfo, err = objectAPI.PutObject(bucket, object, size, r.Body, metadata)
case authTypeStreamingSigned:
// Initialize stream signature verifier.
reader, s3Error := newSignV4ChunkedReader(r)
@ -469,31 +463,22 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(w, r, s3Error, r.URL.Path)
return
}
md5Sum, err = objectAPI.PutObject(bucket, object, size, reader, metadata)
objInfo, err = objectAPI.PutObject(bucket, object, size, reader, metadata)
case authTypePresigned, authTypeSigned:
// Initialize signature verifier.
reader := newSignVerify(r)
// Create object.
md5Sum, err = objectAPI.PutObject(bucket, object, size, reader, metadata)
objInfo, err = objectAPI.PutObject(bucket, object, size, reader, metadata)
}
if err != nil {
errorIf(err, "Unable to create an object.")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return
}
if md5Sum != "" {
w.Header().Set("ETag", "\""+md5Sum+"\"")
}
w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"")
writeSuccessResponse(w, nil)
if globalEventNotifier.IsBucketNotificationSet(bucket) {
// Fetch object info for notifications.
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
if err != nil {
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
return
}
// Notify object created event.
eventNotify(eventData{
Type: ObjectCreatedPut,

@ -36,7 +36,7 @@ type ObjectLayer interface {
// Object operations.
GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error)
GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error)
PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string) (md5 string, err error)
PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string) (objInto ObjectInfo, err error)
DeleteObject(bucket, object string) error
HealObject(bucket, object string) error

@ -200,12 +200,12 @@ func testMultipleObjectCreation(obj ObjectLayer, instanceType string, c TestErrH
objects[key] = []byte(randomString)
metadata := make(map[string]string)
metadata["md5Sum"] = expectedMD5Sumhex
var md5Sum string
md5Sum, err = obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata)
var objInfo ObjectInfo
objInfo, err = obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata)
if err != nil {
c.Fatalf("%s: <ERROR> %s", instanceType, err)
}
if md5Sum != expectedMD5Sumhex {
if objInfo.MD5Sum != expectedMD5Sumhex {
c.Errorf("Md5 Mismatch")
}
}

@ -492,17 +492,17 @@ func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject
// until EOF, erasure codes the data across all disk and additionally
// writes `xl.json` which carries the necessary metadata for future
// object operations.
func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (md5Sum string, err error) {
func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", traceError(BucketNameInvalid{Bucket: bucket})
return ObjectInfo{}, traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify bucket exists.
if !xl.isBucketExist(bucket) {
return "", traceError(BucketNotFound{Bucket: bucket})
return ObjectInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", traceError(ObjectNameInvalid{
return ObjectInfo{}, traceError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
})
@ -538,7 +538,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Ignore error if cache is full, proceed to write the object.
if err != nil && err != objcache.ErrCacheFull {
// For any other error return here.
return "", toObjectErr(traceError(err), bucket, object)
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
}
} else {
mw = md5Writer
@ -568,14 +568,14 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
if err != nil {
// Create file failed, delete temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
return "", toObjectErr(err, minioMetaBucket, tempErasureObj)
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, tempErasureObj)
}
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if sizeWritten < size {
// Short write, delete temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
return "", IncompleteBody{}
return ObjectInfo{}, traceError(IncompleteBody{})
}
// For size == -1, perhaps client is sending in chunked encoding
@ -608,7 +608,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Incoming payload wrong, delete the temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
// Error return.
return "", toObjectErr(vErr, bucket, object)
return ObjectInfo{}, toObjectErr(traceError(vErr), bucket, object)
}
}
@ -619,7 +619,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// MD5 mismatch, delete the temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
// Returns md5 mismatch.
return "", BadDigest{md5Hex, newMD5Hex}
return ObjectInfo{}, traceError(BadDigest{md5Hex, newMD5Hex})
}
}
@ -636,7 +636,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
if xl.parentDirIsObject(bucket, path.Dir(object)) {
// Parent (in the namespace) is an object, delete temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj)
return "", toObjectErr(traceError(errFileAccessDenied), bucket, object)
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
// Rename if an object already exists to temporary location.
@ -647,7 +647,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// regardless of `xl.json` status and rolled back in case of errors.
err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum)
if err != nil {
return "", toObjectErr(err, bucket, object)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
}
@ -672,13 +672,13 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Write unique `xl.json` for each disk.
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil {
return "", toObjectErr(err, bucket, object)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// Rename the successfully written temporary object to final location.
err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum)
if err != nil {
return "", toObjectErr(err, bucket, object)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// Delete the temporary object.
@ -690,8 +690,18 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
newBuffer.Close()
}
// Return md5sum, successfully wrote object.
return newMD5Hex, nil
objInfo = ObjectInfo{
IsDir: false,
Bucket: bucket,
Name: object,
Size: xlMeta.Stat.Size,
ModTime: xlMeta.Stat.ModTime,
MD5Sum: xlMeta.Meta["md5Sum"],
ContentType: xlMeta.Meta["content-type"],
ContentEncoding: xlMeta.Meta["content-encoding"],
UserDefined: xlMeta.Meta,
}
return objInfo, nil
}
// deleteObject - wrapper for delete object, deletes an object from

Loading…
Cancel
Save