diff --git a/erasure-readfile.go b/erasure-readfile.go index e79dc0149..750ed010a 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -65,38 +65,6 @@ func isSuccessDataBlocks(enBlocks [][]byte, dataBlocks int) bool { return successDataBlocksCount >= dataBlocks } -// Return ordered partsMetadata depeinding on distribution. -func getOrderedPartsMetadata(distribution []int, partsMetadata []xlMetaV1) (orderedPartsMetadata []xlMetaV1) { - orderedPartsMetadata = make([]xlMetaV1, len(partsMetadata)) - for index := range partsMetadata { - blockIndex := distribution[index] - orderedPartsMetadata[blockIndex-1] = partsMetadata[index] - } - return orderedPartsMetadata -} - -// getOrderedDisks - get ordered disks from erasure distribution. -// returns ordered slice of disks from their actual distribution. -func getOrderedDisks(distribution []int, disks []StorageAPI) (orderedDisks []StorageAPI) { - orderedDisks = make([]StorageAPI, len(disks)) - // From disks gets ordered disks. - for index := range disks { - blockIndex := distribution[index] - orderedDisks[blockIndex-1] = disks[index] - } - return orderedDisks -} - -// Return ordered CheckSums depending on the distribution. -func getOrderedCheckSums(distribution []int, blockCheckSums []checkSumInfo) (orderedBlockCheckSums []checkSumInfo) { - orderedBlockCheckSums = make([]checkSumInfo, len(blockCheckSums)) - for index := range blockCheckSums { - blockIndex := distribution[index] - orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index] - } - return orderedBlockCheckSums -} - // Return readable disks slice from which we can read parallelly. func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDisks []StorageAPI, nextIndex int, err error) { readDisks = make([]StorageAPI, len(orderedDisks)) @@ -192,27 +160,16 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St // are decoded into a data block. Data block is trimmed for given offset and length, // then written to given writer. This function also supports bit-rot detection by // verifying checksum of individual block's checksum. -func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, partName string, eInfos []erasureInfo, offset int64, length int64, totalLength int64) (int64, error) { +func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, offset int64, length int64, totalLength int64, blockSize int64, dataBlocks int, parityBlocks int, checkSums []string) (int64, error) { // Offset and length cannot be negative. if offset < 0 || length < 0 { return 0, errUnexpected } - // Pick one erasure info. - eInfo := pickValidErasureInfo(eInfos) - - // Gather previously calculated block checksums. - blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) - - // []orderedDisks will have first eInfo.DataBlocks disks as data - // disks and rest will be parity. - orderedDisks := getOrderedDisks(eInfo.Distribution, disks) - orderedBlockCheckSums := getOrderedCheckSums(eInfo.Distribution, blockCheckSums) - // bitRotVerify verifies if the file on a particular disk doesn't have bitrot // by verifying the hash of the contents of the file. bitRotVerify := func() func(diskIndex int) bool { - verified := make([]bool, len(orderedDisks)) + verified := make([]bool, len(disks)) // Return closure so that we have reference to []verified and // not recalculate the hash on it every time the function is // called for the same disk. @@ -222,7 +179,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s return true } // Is this a valid block? - isValid := isValidBlock(orderedDisks[diskIndex], volume, path, orderedBlockCheckSums[diskIndex]) + isValid := isValidBlock(disks[diskIndex], volume, path, checkSums[diskIndex]) verified[diskIndex] = isValid return isValid } @@ -235,32 +192,32 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes. // So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by // DataBlocks. The extra space will have 0-padding. - chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks) + chunkSize := getEncodedBlockLen(blockSize, dataBlocks) // Get start and end block, also bytes to be skipped based on the input offset. - startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, eInfo.BlockSize) + startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, blockSize) // For each block, read chunk from each disk. If we are able to read all the data disks then we don't // need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number // of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer. for block := startBlock; bytesWritten < length; block++ { // Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. - enBlocks := make([][]byte, len(orderedDisks)) + enBlocks := make([][]byte, len(disks)) // enBlocks data can have 0-padding hence we need to figure the exact number // of bytes we want to read from enBlocks. - blockSize := eInfo.BlockSize + blockSize := blockSize // curChunkSize is chunkSize until end block. curChunkSize := chunkSize // We have endBlock, verify if we need to have padding. - if block == endBlock && (totalLength%eInfo.BlockSize != 0) { + if block == endBlock && (totalLength%blockSize != 0) { // If this is the last block and size of the block is < BlockSize. - curChunkSize = getEncodedBlockLen(totalLength%eInfo.BlockSize, eInfo.DataBlocks) + curChunkSize = getEncodedBlockLen(totalLength%blockSize, dataBlocks) // For the last block, the block size can be less than BlockSize. - blockSize = totalLength % eInfo.BlockSize + blockSize = totalLength % blockSize } // Block offset. @@ -277,25 +234,29 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // readDisks - disks from which we need to read in parallel. var readDisks []StorageAPI var err error - readDisks, nextIndex, err = getReadDisks(orderedDisks, nextIndex, eInfo.DataBlocks) + // get readable disks slice from which we can read parallelly. + readDisks, nextIndex, err = getReadDisks(disks, nextIndex, dataBlocks) if err != nil { return bytesWritten, err } - parallelRead(volume, path, readDisks, orderedDisks, enBlocks, blockOffset, curChunkSize, bitRotVerify) - if isSuccessDecodeBlocks(enBlocks, eInfo.DataBlocks) { + // Issue a parallel read across the disks specified in readDisks. + parallelRead(volume, path, readDisks, disks, enBlocks, blockOffset, curChunkSize, bitRotVerify) + if isSuccessDecodeBlocks(enBlocks, dataBlocks) { // If enough blocks are available to do rs.Reconstruct() break } - if nextIndex == len(orderedDisks) { + if nextIndex == len(disks) { // No more disks to read from. return bytesWritten, errXLReadQuorum } + // We do not have enough enough data blocks to reconstruct the data + // hence continue the for-loop till we have enough data blocks. } // If we have all the data blocks no need to decode, continue to write. - if !isSuccessDataBlocks(enBlocks, eInfo.DataBlocks) { + if !isSuccessDataBlocks(enBlocks, dataBlocks) { // Reconstruct the missing data blocks. - if err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks); err != nil { + if err := decodeData(enBlocks, dataBlocks, parityBlocks); err != nil { return bytesWritten, err } } @@ -314,7 +275,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } // Write data blocks. - n, err := writeDataBlocks(writer, enBlocks, eInfo.DataBlocks, outOffset, outSize) + n, err := writeDataBlocks(writer, enBlocks, dataBlocks, outOffset, outSize) if err != nil { return bytesWritten, err } @@ -337,34 +298,21 @@ func (e erasureInfo) PartObjectChecksum(partName string) checkSumInfo { return checkSumInfo{} } -// xlMetaPartBlockChecksums - get block checksums for a given part. -func metaPartBlockChecksums(disks []StorageAPI, eInfos []erasureInfo, partName string) (blockCheckSums []checkSumInfo) { - for index := range disks { - if eInfos[index].IsValid() { - // Save the read checksums for a given part. - blockCheckSums = append(blockCheckSums, eInfos[index].PartObjectChecksum(partName)) - } else { - blockCheckSums = append(blockCheckSums, checkSumInfo{}) - } - } - return blockCheckSums -} - // isValidBlock - calculates the checksum hash for the block and // validates if its correct returns true for valid cases, false otherwise. -func isValidBlock(disk StorageAPI, volume, path string, blockCheckSum checkSumInfo) (ok bool) { +func isValidBlock(disk StorageAPI, volume, path string, checksum string) (ok bool) { // Disk is not available, not a valid block. if disk == nil { return false } // Read everything for a given block and calculate hash. - hashWriter := newHash(blockCheckSum.Algorithm) + hashWriter := newHash("blake2b") hashBytes, err := hashSum(disk, volume, path, hashWriter) if err != nil { errorIf(err, "Unable to calculate checksum %s/%s", volume, path) return false } - return hex.EncodeToString(hashBytes) == blockCheckSum.Hash + return hex.EncodeToString(hashBytes) == checksum } // decodeData - decode encoded blocks. diff --git a/xl-v1-object.go b/xl-v1-object.go index ff7660fa1..d02883205 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -86,6 +86,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i break } } + onlineDisks = getOrderedDisks(xlMeta.Erasure.Distribution, onlineDisks) + metaArr = getOrderedPartsMetadata(xlMeta.Erasure.Distribution, metaArr) // Reply back invalid range if the input offset and length fall out of range. if startOffset > xlMeta.Stat.Size || length > xlMeta.Stat.Size { @@ -109,12 +111,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return toObjectErr(err, bucket, object) } - // Collect all the previous erasure infos across the disk. - var eInfos []erasureInfo - for index := range onlineDisks { - eInfos = append(eInfos, metaArr[index].Erasure) - } - // Save the writer. mw := writer @@ -173,8 +169,17 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i readSize = length - totalBytesRead } + // Get the checksums of the current part. + checkSums := make([]string, len(onlineDisks)) + for index := range onlineDisks { + checkSums[index], _, err = metaArr[index].GetCheckSum(partName) + if err != nil { + return toObjectErr(err, bucket, object) + } + } + // Start reading the part name. - n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) + n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partOffset, readSize, partSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, checkSums) if err != nil { return toObjectErr(err, bucket, object) } diff --git a/xl-v1-utils.go b/xl-v1-utils.go index 23ca47aa1..2e38dd2de 100644 --- a/xl-v1-utils.go +++ b/xl-v1-utils.go @@ -142,3 +142,25 @@ func unionChecksumInfos(cur []checkSumInfo, updated []checkSumInfo, curPartName } return finalChecksums } + +// Return ordered partsMetadata depeinding on distribution. +func getOrderedPartsMetadata(distribution []int, partsMetadata []xlMetaV1) (orderedPartsMetadata []xlMetaV1) { + orderedPartsMetadata = make([]xlMetaV1, len(partsMetadata)) + for index := range partsMetadata { + blockIndex := distribution[index] + orderedPartsMetadata[blockIndex-1] = partsMetadata[index] + } + return orderedPartsMetadata +} + +// getOrderedDisks - get ordered disks from erasure distribution. +// returns ordered slice of disks from their actual distribution. +func getOrderedDisks(distribution []int, disks []StorageAPI) (orderedDisks []StorageAPI) { + orderedDisks = make([]StorageAPI, len(disks)) + // From disks gets ordered disks. + for index := range disks { + blockIndex := distribution[index] + orderedDisks[blockIndex-1] = disks[index] + } + return orderedDisks +}