XL: Bring in sha512 checksum support. (#1797)

master
Harshavardhana 9 years ago
parent db2fdbf38d
commit c493ab5d0d
  1. 10
      erasure-appendfile.go
  2. 182
      erasure-readfile.go
  3. 38
      erasure-utils.go
  4. 60
      erasure.go
  5. 2
      fs-v1-metadata.go
  6. 10
      xl-v1-healing.go
  7. 164
      xl-v1-metadata.go
  8. 96
      xl-v1-multipart.go
  9. 129
      xl-v1-object.go

@ -19,16 +19,16 @@ package main
import "sync"
// AppendFile - append data buffer at path.
func (e erasure) AppendFile(volume, path string, dataBuffer []byte) (n int64, err error) {
func (e erasureConfig) AppendFile(volume, path string, dataBuffer []byte) (n int64, err error) {
// Split the input buffer into data and parity blocks.
var blocks [][]byte
blocks, err = e.ReedSolomon.Split(dataBuffer)
blocks, err = e.reedSolomon.Split(dataBuffer)
if err != nil {
return 0, err
}
// Encode parity blocks using data blocks.
err = e.ReedSolomon.Encode(blocks)
err = e.reedSolomon.Encode(blocks)
if err != nil {
return 0, err
}
@ -55,6 +55,10 @@ func (e erasure) AppendFile(volume, path string, dataBuffer []byte) (n int64, er
wErrs[index] = errUnexpected
return
}
// Calculate hash.
e.hashWriters[blockIndex].Write(blocks[blockIndex])
// Successfully wrote.
wErrs[index] = nil
}(index, disk)
}

@ -16,82 +16,136 @@
package main
import "errors"
import (
"encoding/hex"
"errors"
)
// ReadFile - decoded erasure coded file.
func (e erasure) ReadFile(volume, path string, startOffset int64, buffer []byte) (int64, error) {
// Calculate the current encoded block size.
curEncBlockSize := getEncodedBlockLen(int64(len(buffer)), e.DataBlocks)
offsetEncOffset := getEncodedBlockLen(startOffset, e.DataBlocks)
// Allocate encoded blocks up to storage disks.
enBlocks := make([][]byte, len(e.storageDisks))
// Counter to keep success data blocks.
var successDataBlocksCount = 0
var noReconstruct bool // Set for no reconstruction.
// Read from all the disks.
for index, disk := range e.storageDisks {
blockIndex := e.distribution[index] - 1
if disk == nil {
continue
// isValidBlock - calculates the checksum hash for the block and
// validates if its correct returns true for valid cases, false otherwise.
func (e erasureConfig) isValidBlock(volume, path string, blockIdx int) bool {
diskIndex := -1
// Find out the right disk index for the input block index.
for index, blockIndex := range e.distribution {
if blockIndex == blockIdx {
diskIndex = index
}
// Initialize shard slice and fill the data from each parts.
enBlocks[blockIndex] = make([]byte, curEncBlockSize)
// Read the necessary blocks.
_, err := disk.ReadFile(volume, path, offsetEncOffset, enBlocks[blockIndex])
if err != nil {
enBlocks[blockIndex] = nil
}
// Unknown block index requested, treat it as error.
if diskIndex == -1 {
return false
}
// Disk is not present, treat entire block to be non existent.
if e.storageDisks[diskIndex] == nil {
return false
}
// Read everything for a given block and calculate hash.
hashBytes, err := hashSum(e.storageDisks[diskIndex], volume, path, newHash(e.checkSumAlgo))
if err != nil {
return false
}
return hex.EncodeToString(hashBytes) == e.hashChecksums[diskIndex]
}
// ReadFile - decoded erasure coded file.
func (e erasureConfig) ReadFile(volume, path string, size int64, blockSize int64) ([]byte, error) {
// Return data buffer.
var buffer []byte
// Total size left
totalSizeLeft := size
// Starting offset for reading.
startOffset := int64(0)
// Write until each parts are read and exhausted.
for totalSizeLeft > 0 {
// Calculate the proper block size.
var curBlockSize int64
if blockSize < totalSizeLeft {
curBlockSize = blockSize
} else {
curBlockSize = totalSizeLeft
}
// Verify if we have successfully read all the data blocks.
if blockIndex < e.DataBlocks && enBlocks[blockIndex] != nil {
successDataBlocksCount++
// Set when we have all the data blocks and no
// reconstruction is needed, so that we can avoid
// erasure reconstruction.
noReconstruct = successDataBlocksCount == e.DataBlocks
if noReconstruct {
// Break out we have read all the data blocks.
break
// Calculate the current encoded block size.
curEncBlockSize := getEncodedBlockLen(curBlockSize, e.dataBlocks)
offsetEncOffset := getEncodedBlockLen(startOffset, e.dataBlocks)
// Allocate encoded blocks up to storage disks.
enBlocks := make([][]byte, len(e.storageDisks))
// Counter to keep success data blocks.
var successDataBlocksCount = 0
var noReconstruct bool // Set for no reconstruction.
// Read from all the disks.
for index, disk := range e.storageDisks {
blockIndex := e.distribution[index] - 1
if !e.isValidBlock(volume, path, blockIndex) {
continue
}
// Initialize shard slice and fill the data from each parts.
enBlocks[blockIndex] = make([]byte, curEncBlockSize)
// Read the necessary blocks.
_, err := disk.ReadFile(volume, path, offsetEncOffset, enBlocks[blockIndex])
if err != nil {
enBlocks[blockIndex] = nil
}
// Verify if we have successfully read all the data blocks.
if blockIndex < e.dataBlocks && enBlocks[blockIndex] != nil {
successDataBlocksCount++
// Set when we have all the data blocks and no
// reconstruction is needed, so that we can avoid
// erasure reconstruction.
noReconstruct = successDataBlocksCount == e.dataBlocks
if noReconstruct {
// Break out we have read all the data blocks.
break
}
}
}
}
// Check blocks if they are all zero in length, we have corruption return error.
if checkBlockSize(enBlocks) == 0 {
return 0, errDataCorrupt
}
// Check blocks if they are all zero in length, we have corruption return error.
if checkBlockSize(enBlocks) == 0 {
return nil, errDataCorrupt
}
// Verify if reconstruction is needed, proceed with reconstruction.
if !noReconstruct {
err := e.ReedSolomon.Reconstruct(enBlocks)
if err != nil {
return 0, err
// Verify if reconstruction is needed, proceed with reconstruction.
if !noReconstruct {
err := e.reedSolomon.Reconstruct(enBlocks)
if err != nil {
return nil, err
}
// Verify reconstructed blocks (parity).
ok, err := e.reedSolomon.Verify(enBlocks)
if err != nil {
return nil, err
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
return nil, err
}
}
// Verify reconstructed blocks (parity).
ok, err := e.ReedSolomon.Verify(enBlocks)
// Get data blocks from encoded blocks.
dataBlocks, err := getDataBlocks(enBlocks, e.dataBlocks, int(curBlockSize))
if err != nil {
return 0, err
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
return 0, err
return nil, err
}
}
// Get data blocks from encoded blocks.
dataBlocks, err := getDataBlocks(enBlocks, e.DataBlocks, len(buffer))
if err != nil {
return 0, err
}
// Copy data blocks.
buffer = append(buffer, dataBlocks...)
// Copy data blocks.
copy(buffer, dataBlocks)
// Negate the 'n' size written to client.
totalSizeLeft -= int64(len(dataBlocks))
// Relenquish memory.
dataBlocks = nil
// Increase the offset to move forward.
startOffset += int64(len(dataBlocks))
return int64(len(buffer)), nil
// Relenquish memory.
dataBlocks = nil
}
return buffer, nil
}

@ -16,7 +16,42 @@
package main
import "github.com/klauspost/reedsolomon"
import (
"crypto/sha512"
"hash"
"io"
"github.com/klauspost/reedsolomon"
)
// newHash - gives you a newly allocated hash depending on the input algorithm.
func newHash(algo string) hash.Hash {
switch algo {
case "sha512":
return sha512.New()
// Add new hashes here.
default:
return sha512.New()
}
}
func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) {
startOffset := int64(0)
// Read until io.EOF.
for {
buf := make([]byte, blockSizeV1)
n, err := disk.ReadFile(volume, path, startOffset, buf)
if err == io.EOF {
break
}
if err != nil && err != io.EOF {
return nil, err
}
writer.Write(buf[:n])
startOffset += n
}
return writer.Sum(nil), nil
}
// getDataBlocks - fetches the data block only part of the input encoded blocks.
func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data []byte, err error) {
@ -31,6 +66,7 @@ func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data []
if size < curBlockSize {
return nil, reedsolomon.ErrShortData
}
write := curBlockSize
for _, block := range blocks {
if write < len(block) {

@ -16,21 +16,30 @@
package main
import "github.com/klauspost/reedsolomon"
import (
"encoding/hex"
"hash"
"github.com/klauspost/reedsolomon"
)
// erasure storage layer.
type erasure struct {
ReedSolomon reedsolomon.Encoder // Erasure encoder/decoder.
DataBlocks int
ParityBlocks int
storageDisks []StorageAPI
distribution []int
type erasureConfig struct {
reedSolomon reedsolomon.Encoder // Erasure encoder/decoder.
dataBlocks int // Calculated data disks.
storageDisks []StorageAPI // Initialized storage disks.
distribution []int // Erasure block distribution.
hashWriters []hash.Hash // Allocate hash writers.
// Carries hex checksums needed for validating Reads.
hashChecksums []string
checkSumAlgo string
}
// newErasure instantiate a new erasure.
func newErasure(disks []StorageAPI, distribution []int) *erasure {
func newErasure(disks []StorageAPI, distribution []int) *erasureConfig {
// Initialize E.
e := &erasure{}
e := &erasureConfig{}
// Calculate data and parity blocks.
dataBlocks, parityBlocks := len(disks)/2, len(disks)/2
@ -40,9 +49,8 @@ func newErasure(disks []StorageAPI, distribution []int) *erasure {
fatalIf(err, "Unable to initialize reedsolomon package.")
// Save the reedsolomon.
e.DataBlocks = dataBlocks
e.ParityBlocks = parityBlocks
e.ReedSolomon = rs
e.dataBlocks = dataBlocks
e.reedSolomon = rs
// Save all the initialized storage disks.
e.storageDisks = disks
@ -53,3 +61,31 @@ func newErasure(disks []StorageAPI, distribution []int) *erasure {
// Return successfully initialized.
return e
}
// SaveAlgo - FIXME.
func (e *erasureConfig) SaveAlgo(algo string) {
e.checkSumAlgo = algo
}
// Save hex encoded hashes - saves hashes that need to be validated
// during reads for each blocks.
func (e *erasureConfig) SaveHashes(hashes []string) {
e.hashChecksums = hashes
}
// InitHash - initializes new hash for all blocks.
func (e *erasureConfig) InitHash(algo string) {
e.hashWriters = make([]hash.Hash, len(e.storageDisks))
for index := range e.storageDisks {
e.hashWriters[index] = newHash(algo)
}
}
// GetHashes - returns a slice of hex encoded hash.
func (e erasureConfig) GetHashes() []string {
var hexHashes = make([]string, len(e.storageDisks))
for index, hashWriter := range e.hashWriters {
hexHashes[index] = hex.EncodeToString(hashWriter.Sum(nil))
}
return hexHashes
}

@ -52,7 +52,7 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin
m.Parts = append(m.Parts, partInfo)
// Parts in fsMeta should be in sorted order by part number.
sort.Sort(byPartNumber(m.Parts))
sort.Sort(byObjectPartNumber(m.Parts))
}
// readFSMetadata - returns the object metadata `fs.json` content.

@ -41,19 +41,18 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
offset := int64(0)
var buffer = make([]byte, blockSizeV1)
n, err := disk.ReadFile(bucket, xlMetaPath, offset, buffer)
buffer, err := readAll(disk, bucket, xlMetaPath)
if err != nil {
errs[index] = err
return
}
err = json.Unmarshal(buffer[:n], &metadataArray[index])
err = json.Unmarshal(buffer, &metadataArray[index])
if err != nil {
// Unable to parse xl.json, set error.
errs[index] = err
return
}
// Relinquish buffer.
buffer = nil
errs[index] = nil
}(index, disk)
@ -151,9 +150,8 @@ func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) {
// - xlMetaV1
// - bool value indicating if healing is needed.
// - error if any.
func (xl xlObjects) listOnlineDisks(bucket, object string) (onlineDisks []StorageAPI, version int64, err error) {
func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, version int64, err error) {
onlineDisks = make([]StorageAPI, len(xl.storageDisks))
partsMetadata, errs := xl.readAllXLMetadata(bucket, object)
if err = xl.reduceError(errs); err != nil {
if err == errFileNotFound {
// For file not found, treat as if disks are available

@ -18,6 +18,7 @@ package main
import (
"encoding/json"
"fmt"
"path"
"sort"
"sync"
@ -39,12 +40,19 @@ type objectPartInfo struct {
Size int64 `json:"size"`
}
// byPartName is a collection satisfying sort.Interface.
type byPartNumber []objectPartInfo
// byObjectPartNumber is a collection satisfying sort.Interface.
type byObjectPartNumber []objectPartInfo
func (t byPartNumber) Len() int { return len(t) }
func (t byPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t byPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number }
func (t byObjectPartNumber) Len() int { return len(t) }
func (t byObjectPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t byObjectPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number }
// checkSumInfo - carries checksums of individual part.
type checkSumInfo struct {
Name string `json:"name"`
Algorithm string `json:"algorithm"`
Hash string `json:"hash"`
}
// A xlMetaV1 represents a metadata header mapping keys to sets of values.
type xlMetaV1 struct {
@ -56,17 +64,13 @@ type xlMetaV1 struct {
Version int64 `json:"version"`
} `json:"stat"`
Erasure struct {
Algorithm string `json:"algorithm"`
DataBlocks int `json:"data"`
ParityBlocks int `json:"parity"`
BlockSize int64 `json:"blockSize"`
Index int `json:"index"`
Distribution []int `json:"distribution"`
Checksum []struct {
Name string `json:"name"`
Algorithm string `json:"algorithm"`
Hash string `json:"hash"`
} `json:"checksum"`
Algorithm string `json:"algorithm"`
DataBlocks int `json:"data"`
ParityBlocks int `json:"parity"`
BlockSize int64 `json:"blockSize"`
Index int `json:"index"`
Distribution []int `json:"distribution"`
Checksum []checkSumInfo `json:"checksum,omitempty"`
} `json:"erasure"`
Minio struct {
Release string `json:"release"`
@ -89,6 +93,11 @@ func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) {
return xlMeta
}
// IsValid - is validate tells if the format is sane.
func (m xlMetaV1) IsValid() bool {
return m.Version == "1" && m.Format == "xl"
}
// ObjectPartIndex - returns the index of matching object part number.
func (m xlMetaV1) ObjectPartIndex(partNumber int) (index int) {
for i, part := range m.Parts {
@ -100,6 +109,17 @@ func (m xlMetaV1) ObjectPartIndex(partNumber int) (index int) {
return -1
}
// ObjectCheckIndex - returns the checksum for the part name from the checksum slice.
func (m xlMetaV1) PartObjectChecksum(partNumber int) checkSumInfo {
partName := fmt.Sprintf("object%d", partNumber)
for _, checksum := range m.Erasure.Checksum {
if checksum.Name == partName {
return checksum
}
}
return checkSumInfo{}
}
// AddObjectPart - add a new object part in order.
func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64) {
partInfo := objectPartInfo{
@ -121,11 +141,11 @@ func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag strin
m.Parts = append(m.Parts, partInfo)
// Parts in xlMeta should be in sorted order by part number.
sort.Sort(byPartNumber(m.Parts))
sort.Sort(byObjectPartNumber(m.Parts))
}
// objectToPartOffset - translate offset of an object to offset of its individual part.
func (m xlMetaV1) objectToPartOffset(offset int64) (partIndex int, partOffset int64, err error) {
// ObjectToPartOffset - translate offset of an object to offset of its individual part.
func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset int64, err error) {
partOffset = offset
// Seek until object offset maps to a particular part offset.
for i, part := range m.Parts {
@ -146,6 +166,18 @@ func (m xlMetaV1) objectToPartOffset(offset int64) (partIndex int, partOffset in
return 0, 0, InvalidRange{}
}
// pickValidXLMeta - picks one valid xlMeta content and returns from a
// slice of xlmeta content. If no value is found this function panics
// and dies.
func pickValidXLMeta(xlMetas []xlMetaV1) xlMetaV1 {
for _, xlMeta := range xlMetas {
if xlMeta.IsValid() {
return xlMeta
}
}
panic("Unable to look for valid XL metadata content")
}
// readXLMetadata - returns the object metadata `xl.json` content from
// one of the disks picked at random.
func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) {
@ -160,7 +192,10 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
if err == nil {
err = json.Unmarshal(buffer, &xlMeta)
if err == nil {
return xlMeta, nil
if xlMeta.IsValid() {
return xlMeta, nil
}
err = errDataCorrupt
}
}
xlJSONErrCount++ // Update error count.
@ -209,12 +244,85 @@ func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix
return nil
}
// writeXLMetadata - writes `xl.json` to a single disk.
func writeXLMetadata(disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) error {
jsonFile := path.Join(prefix, xlMetaJSONFile)
// Marshal json.
metadataBytes, err := json.Marshal(&xlMeta)
if err != nil {
return err
}
// Persist marshalled data.
n, err := disk.AppendFile(bucket, jsonFile, metadataBytes)
if err != nil {
return err
}
if n != int64(len(metadataBytes)) {
return errUnexpected
}
return nil
}
// checkSumAlgorithm - get the algorithm required for checksum
// verification for a given part. Allocates a new hash and returns.
func checkSumAlgorithm(xlMeta xlMetaV1, partIdx int) string {
partCheckSumInfo := xlMeta.PartObjectChecksum(partIdx)
return partCheckSumInfo.Algorithm
}
// xlMetaPartBlockChecksums - get block checksums for a given part.
func (xl xlObjects) metaPartBlockChecksums(xlMetas []xlMetaV1, partIdx int) (blockCheckSums []string) {
for index := range xl.storageDisks {
// Save the read checksums for a given part.
blockCheckSums = append(blockCheckSums, xlMetas[index].PartObjectChecksum(partIdx).Hash)
}
return blockCheckSums
}
// writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order.
func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMetaV1) error {
var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(xl.storageDisks))
// Start writing `xl.json` to all disks in parallel.
for index, disk := range xl.storageDisks {
wg.Add(1)
// Write `xl.json` in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
// Pick one xlMeta for a disk at index.
xlMetas[index].Erasure.Index = index + 1
// Write unique `xl.json` for a disk at index.
if err := writeXLMetadata(disk, bucket, prefix, xlMetas[index]); err != nil {
mErrs[index] = err
return
}
mErrs[index] = nil
}(index, disk)
}
// Wait for all the routines.
wg.Wait()
// Return the first error.
for _, err := range mErrs {
if err == nil {
continue
}
return err
}
return nil
}
// writeXLMetadata - write `xl.json` on all disks in order.
func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error {
var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(xl.storageDisks))
jsonFile := path.Join(prefix, xlMetaJSONFile)
// Start writing `xl.json` to all disks in parallel.
for index, disk := range xl.storageDisks {
wg.Add(1)
@ -225,21 +333,11 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
// Save the disk order index.
metadata.Erasure.Index = index + 1
metadataBytes, err := json.Marshal(&metadata)
if err != nil {
// Write xl metadata.
if err := writeXLMetadata(disk, bucket, prefix, metadata); err != nil {
mErrs[index] = err
return
}
// Persist marshalled data.
n, mErr := disk.AppendFile(bucket, jsonFile, metadataBytes)
if mErr != nil {
mErrs[index] = mErr
return
}
if n != int64(len(metadataBytes)) {
mErrs[index] = errUnexpected
return
}
mErrs[index] = nil
}(index, disk, xlMeta)
}

@ -110,39 +110,32 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string,
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
}
uploadIDLocked := false
defer func() {
if uploadIDLocked {
nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
}
}()
// Figure out the erasure distribution first.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
uploadIDLocked = true
uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID)
nsMutex.Lock(minioMetaBucket, uploadIDPath)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath)
if !xl.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Read metadata associated with the object from all disks.
partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath)
// List all online disks.
onlineDisks, higherVersion, err := xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
onlineDisks, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Unlock the uploadID so that parallel uploads of parts can happen.
nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
uploadIDLocked = false
// Pick one from the first valid metadata.
xlMeta := pickValidXLMeta(partsMetadata)
// Initialize a new erasure with online disks and new distribution.
erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution)
// Initialize sha512 hash.
erasure.InitHash("sha512")
partSuffix := fmt.Sprintf("object%d", partID)
tmpPartPath := path.Join(tmpMetaPrefix, uploadID, partSuffix)
@ -182,31 +175,12 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string,
}
}
// Hold lock as we are updating UPLODID/xl.json and renaming the part file from tmp location.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
uploadIDLocked = true
if !xl.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
}
// List all online disks.
onlineDisks, higherVersion, err = xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Increment version only if we have online disks less than configured storage disks.
if diskCount(onlineDisks) < len(xl.storageDisks) {
higherVersion++
}
xlMeta, err = xl.readXLMetadata(minioMetaBucket, uploadIDPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Rename temporary part file to its final location.
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
partPath := path.Join(uploadIDPath, partSuffix)
err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, partPath)
@ -214,10 +188,32 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string,
// Once part is successfully committed, proceed with updating XL metadata.
xlMeta.Stat.Version = higherVersion
// Add the current part.
xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
// Get calculated hash checksums from erasure to save in `xl.json`.
hashChecksums := erasure.GetHashes()
checkSums := make([]checkSumInfo, len(xl.storageDisks))
for index := range xl.storageDisks {
blockIndex := xlMeta.Erasure.Distribution[index] - 1
checkSums[blockIndex] = checkSumInfo{
Name: partSuffix,
Algorithm: "sha512",
Hash: hashChecksums[blockIndex],
}
}
for index := range partsMetadata {
blockIndex := xlMeta.Erasure.Distribution[index] - 1
partsMetadata[index].Parts = xlMeta.Parts
partsMetadata[index].Erasure.Checksum = append(partsMetadata[index].Erasure.Checksum, checkSums[blockIndex])
}
// Write all the checksum metadata.
tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil {
// Write unique `xl.json` each disk.
if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempUploadIDPath, partsMetadata); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
rErr := xl.renameXLMetadata(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath)
@ -258,6 +254,7 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM
result := ListPartsInfo{}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath)
if err != nil {
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath)
@ -352,14 +349,18 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID)
// Read the current `xl.json`.
xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath)
if err != nil {
// Read metadata associated with the object from all disks.
partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath)
if err = xl.reduceError(errs); err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Calculate full object size.
var objectSize int64
// Pick one from the first valid metadata.
xlMeta := pickValidXLMeta(partsMetadata)
// Save current xl meta for validation.
var currentXLMeta = xlMeta
@ -405,7 +406,16 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
xlMeta.Meta["md5Sum"] = s3MD5
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil {
// Update all xl metadata, make sure to not modify fields like
// checksum which are different on each disks.
for index := range partsMetadata {
partsMetadata[index].Stat = xlMeta.Stat
partsMetadata[index].Meta = xlMeta.Meta
partsMetadata[index].Parts = xlMeta.Parts
}
// Write unique `xl.json` for each disk.
if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempUploadIDPath, partsMetadata); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
rErr := xl.renameXLMetadata(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath)

@ -31,63 +31,83 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
nsMutex.RLock(bucket, object)
defer nsMutex.RUnlock(bucket, object)
// Read metadata associated with the object.
xlMeta, err := xl.readXLMetadata(bucket, object)
if err != nil {
// Read metadata associated with the object from all disks.
partsMetadata, errs := xl.readAllXLMetadata(bucket, object)
if err := xl.reduceError(errs); err != nil {
return toObjectErr(err, bucket, object)
}
// List all online disks.
onlineDisks, _, err := xl.listOnlineDisks(bucket, object)
onlineDisks, _, err := xl.listOnlineDisks(partsMetadata, errs)
if err != nil {
return toObjectErr(err, bucket, object)
}
// Initialize a new erasure with online disks, with previous block distribution.
erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution)
// Pick one from the first valid metadata.
xlMeta := partsMetadata[0]
if !xlMeta.IsValid() {
for _, partMetadata := range partsMetadata {
if partMetadata.IsValid() {
xlMeta = partMetadata
break
}
}
}
// Get part index offset.
partIndex, partOffset, err := xlMeta.objectToPartOffset(startOffset)
partIndex, partOffset, err := xlMeta.ObjectToPartOffset(startOffset)
if err != nil {
return toObjectErr(err, bucket, object)
}
// Read from all parts.
for ; partIndex < len(xlMeta.Parts); partIndex++ {
part := xlMeta.Parts[partIndex]
totalLeft := part.Size
beginOffset := int64(0)
for totalLeft > 0 {
var curBlockSize int64
if xlMeta.Erasure.BlockSize < totalLeft {
curBlockSize = xlMeta.Erasure.BlockSize
} else {
curBlockSize = totalLeft
}
var buffer = make([]byte, curBlockSize)
var n int64
n, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), beginOffset, buffer)
// Save the current part name and size.
partName := xlMeta.Parts[partIndex].Name
partSize := xlMeta.Parts[partIndex].Size
// Initialize a new erasure with online disks, with previous
// block distribution for each part reads.
erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution)
// Set previously calculated block checksums and algorithm for validation.
erasure.SaveAlgo(checkSumAlgorithm(xlMeta, partIndex+1))
erasure.SaveHashes(xl.metaPartBlockChecksums(partsMetadata, partIndex+1))
// Data block size.
blockSize := xlMeta.Erasure.BlockSize
// Start reading the part name.
var buffer []byte
buffer, err = erasure.ReadFile(bucket, pathJoin(object, partName), partSize, blockSize)
if err != nil {
return err
}
// Copy to client until length requested.
if length > int64(len(buffer)) {
var m int64
m, err = io.Copy(writer, bytes.NewReader(buffer[partOffset:]))
if err != nil {
return err
}
if length > int64(len(buffer)) {
var m int64
m, err = io.Copy(writer, bytes.NewReader(buffer[partOffset:]))
if err != nil {
return err
}
length -= m
} else {
_, err = io.CopyN(writer, bytes.NewReader(buffer[partOffset:]), length)
if err != nil {
return err
}
return nil
length -= m
} else {
_, err = io.CopyN(writer, bytes.NewReader(buffer[partOffset:]), length)
if err != nil {
return err
}
totalLeft -= n
beginOffset += n
// Reset part offset to 0 to read rest of the part from the beginning.
partOffset = 0
return nil
}
// Relinquish memory.
buffer = nil
// Reset part offset to 0 to read rest of the part from the beginning.
partOffset = 0
}
// Return success.
return nil
}
@ -220,8 +240,11 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Initialize xl meta.
xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks)
// Read metadata associated with the object from all disks.
partsMetadata, errs := xl.readAllXLMetadata(bucket, object)
// List all online disks.
onlineDisks, higherVersion, err := xl.listOnlineDisks(bucket, object)
onlineDisks, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
@ -234,6 +257,9 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Initialize a new erasure with online disks and new distribution.
erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution)
// Initialize sha512 hash.
erasure.InitHash("sha512")
// Initialize md5 writer.
md5Writer := md5.New()
@ -305,10 +331,33 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
xlMeta.Stat.Size = size
xlMeta.Stat.ModTime = modTime
xlMeta.Stat.Version = higherVersion
// Add the final part.
xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size)
// Write `xl.json` metadata.
if err = xl.writeXLMetadata(minioMetaBucket, tempObj, xlMeta); err != nil {
// Get hash checksums.
hashChecksums := erasure.GetHashes()
// Save the checksums.
checkSums := make([]checkSumInfo, len(xl.storageDisks))
for index := range xl.storageDisks {
blockIndex := xlMeta.Erasure.Distribution[index] - 1
checkSums[blockIndex] = checkSumInfo{
Name: "object1",
Algorithm: "sha512",
Hash: hashChecksums[blockIndex],
}
}
// Update all the necessary fields making sure that checkSum field
// is different for each disks.
for index := range partsMetadata {
blockIndex := xlMeta.Erasure.Distribution[index] - 1
partsMetadata[index] = xlMeta
partsMetadata[index].Erasure.Checksum = append(partsMetadata[index].Erasure.Checksum, checkSums[blockIndex])
}
// Write unique `xl.json` for each disk.
if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempObj, partsMetadata); err != nil {
return "", toObjectErr(err, bucket, object)
}

Loading…
Cancel
Save