|
|
|
@ -22,6 +22,7 @@ import ( |
|
|
|
|
"hash" |
|
|
|
|
"io" |
|
|
|
|
"path" |
|
|
|
|
"strconv" |
|
|
|
|
"strings" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
@ -461,7 +462,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
uniqueID := mustGetUUID() |
|
|
|
|
tempErasureObj := path.Join(uniqueID, "part.1") |
|
|
|
|
tempObj := uniqueID |
|
|
|
|
|
|
|
|
|
// Initialize md5 writer.
|
|
|
|
@ -512,40 +512,101 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. |
|
|
|
|
// Tee reader combines incoming data stream and md5, data read from input stream is written to md5.
|
|
|
|
|
teeReader := io.TeeReader(limitDataReader, mw) |
|
|
|
|
|
|
|
|
|
// Initialize xl meta.
|
|
|
|
|
// Initialize parts metadata
|
|
|
|
|
partsMetadata := make([]xlMetaV1, len(xl.storageDisks)) |
|
|
|
|
|
|
|
|
|
xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks) |
|
|
|
|
|
|
|
|
|
onlineDisks := getOrderedDisks(xlMeta.Erasure.Distribution, xl.storageDisks) |
|
|
|
|
// Initialize xl meta.
|
|
|
|
|
for index := range partsMetadata { |
|
|
|
|
partsMetadata[index] = xlMeta |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Order disks according to erasure distribution
|
|
|
|
|
onlineDisks := getOrderedDisks(partsMetadata[0].Erasure.Distribution, xl.storageDisks) |
|
|
|
|
|
|
|
|
|
// Delete temporary object in the event of failure.
|
|
|
|
|
// If PutObject succeeded there would be no temporary
|
|
|
|
|
// object to delete.
|
|
|
|
|
defer xl.deleteObject(minioMetaTmpBucket, tempObj) |
|
|
|
|
|
|
|
|
|
if size > 0 { |
|
|
|
|
for _, disk := range onlineDisks { |
|
|
|
|
if disk != nil { |
|
|
|
|
actualSize := xl.sizeOnDisk(size, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks) |
|
|
|
|
disk.PrepareFile(minioMetaTmpBucket, tempErasureObj, actualSize) |
|
|
|
|
// Total size of the written object
|
|
|
|
|
sizeWritten := int64(0) |
|
|
|
|
|
|
|
|
|
// Read data and split into parts - similar to multipart mechanism
|
|
|
|
|
for partIdx := 1; ; partIdx++ { |
|
|
|
|
// Compute part name
|
|
|
|
|
partName := "part." + strconv.Itoa(partIdx) |
|
|
|
|
// Compute the path of current part
|
|
|
|
|
tempErasureObj := path.Join(uniqueID, partName) |
|
|
|
|
|
|
|
|
|
// Calculate the size of the current part, if size is unknown, curPartSize wil be unknown too.
|
|
|
|
|
// allowEmptyPart will always be true if this is the first part and false otherwise.
|
|
|
|
|
curPartSize := getPartSizeFromIdx(size, globalPutPartSize, partIdx) |
|
|
|
|
|
|
|
|
|
// Prepare file for eventual optimization in the disk
|
|
|
|
|
if curPartSize > 0 { |
|
|
|
|
// Calculate the real size of the part in the disk and prepare it for eventual optimization
|
|
|
|
|
actualSize := xl.sizeOnDisk(curPartSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks) |
|
|
|
|
for _, disk := range onlineDisks { |
|
|
|
|
if disk != nil { |
|
|
|
|
disk.PrepareFile(minioMetaTmpBucket, tempErasureObj, actualSize) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Erasure code data and write across all disks.
|
|
|
|
|
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum) |
|
|
|
|
if err != nil { |
|
|
|
|
return ObjectInfo{}, toObjectErr(err, minioMetaTmpBucket, tempErasureObj) |
|
|
|
|
} |
|
|
|
|
// Should return IncompleteBody{} error when reader has fewer bytes
|
|
|
|
|
// than specified in request header.
|
|
|
|
|
if sizeWritten < size { |
|
|
|
|
return ObjectInfo{}, traceError(IncompleteBody{}) |
|
|
|
|
// partReader streams at most maximum part size
|
|
|
|
|
partReader := io.LimitReader(teeReader, globalPutPartSize) |
|
|
|
|
|
|
|
|
|
// Allow creating empty earsure file only when this is the first part. This flag is useful
|
|
|
|
|
// when size == -1 because in this case, we are not able to predict how many parts we will have.
|
|
|
|
|
allowEmptyPart := partIdx == 1 |
|
|
|
|
|
|
|
|
|
// Erasure code data and write across all disks.
|
|
|
|
|
partSizeWritten, checkSums, erasureErr := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tempErasureObj, partReader, allowEmptyPart, partsMetadata[0].Erasure.BlockSize, partsMetadata[0].Erasure.DataBlocks, partsMetadata[0].Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum) |
|
|
|
|
if erasureErr != nil { |
|
|
|
|
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Should return IncompleteBody{} error when reader has fewer bytes
|
|
|
|
|
// than specified in request header.
|
|
|
|
|
if partSizeWritten < int64(curPartSize) { |
|
|
|
|
return ObjectInfo{}, traceError(IncompleteBody{}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Update the total written size
|
|
|
|
|
sizeWritten += partSizeWritten |
|
|
|
|
|
|
|
|
|
// If erasure stored some data in the loop or created an empty file
|
|
|
|
|
if partSizeWritten > 0 || allowEmptyPart { |
|
|
|
|
for index := range partsMetadata { |
|
|
|
|
// Add the part to xl.json.
|
|
|
|
|
partsMetadata[index].AddObjectPart(partIdx, partName, "", partSizeWritten) |
|
|
|
|
// Add part checksum info to xl.json.
|
|
|
|
|
partsMetadata[index].Erasure.AddCheckSumInfo(checkSumInfo{ |
|
|
|
|
Name: partName, |
|
|
|
|
Hash: checkSums[index], |
|
|
|
|
Algorithm: bitRotAlgo, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If we didn't write anything or we know that the next part doesn't have any
|
|
|
|
|
// data to write, we should quit this loop immediately
|
|
|
|
|
if partSizeWritten == 0 || getPartSizeFromIdx(size, globalPutPartSize, partIdx+1) == 0 { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// For size == -1, perhaps client is sending in chunked encoding
|
|
|
|
|
// set the size as size that was actually written.
|
|
|
|
|
if size == -1 { |
|
|
|
|
size = sizeWritten |
|
|
|
|
} else { |
|
|
|
|
// Check if stored data satisfies what is asked
|
|
|
|
|
if sizeWritten < size { |
|
|
|
|
return ObjectInfo{}, traceError(IncompleteBody{}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Save additional erasureMetadata.
|
|
|
|
@ -604,22 +665,11 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Fill all the necessary metadata.
|
|
|
|
|
xlMeta.Meta = metadata |
|
|
|
|
xlMeta.Stat.Size = size |
|
|
|
|
xlMeta.Stat.ModTime = modTime |
|
|
|
|
|
|
|
|
|
// Add the final part.
|
|
|
|
|
xlMeta.AddObjectPart(1, "part.1", newMD5Hex, xlMeta.Stat.Size) |
|
|
|
|
|
|
|
|
|
partsMetadata := make([]xlMetaV1, len(xl.storageDisks)) |
|
|
|
|
// Update `xl.json` content on each disks.
|
|
|
|
|
for index := range partsMetadata { |
|
|
|
|
partsMetadata[index] = xlMeta |
|
|
|
|
partsMetadata[index].Erasure.AddCheckSumInfo(checkSumInfo{ |
|
|
|
|
Name: "part.1", |
|
|
|
|
Hash: checkSums[index], |
|
|
|
|
Algorithm: bitRotAlgo, |
|
|
|
|
}) |
|
|
|
|
partsMetadata[index].Meta = metadata |
|
|
|
|
partsMetadata[index].Stat.Size = size |
|
|
|
|
partsMetadata[index].Stat.ModTime = modTime |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Write unique `xl.json` for each disk.
|
|
|
|
@ -639,6 +689,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. |
|
|
|
|
newBuffer.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Object info is the same in all disks, so we can pick the first meta
|
|
|
|
|
// of the first disk
|
|
|
|
|
xlMeta = partsMetadata[0] |
|
|
|
|
|
|
|
|
|
objInfo = ObjectInfo{ |
|
|
|
|
IsDir: false, |
|
|
|
|
Bucket: bucket, |
|
|
|
|