You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
minio/erasure-createfile.go

163 lines
4.6 KiB

/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/hex"
"hash"
"io"
"sync"
"github.com/klauspost/reedsolomon"
)
// erasureCreateFile - writes an entire stream by erasure coding to
// all the disks, writes also calculate individual block's checksum
// for future bit-rot protection.
func erasureCreateFile(disks []StorageAPI, volume string, path string, partName string, data io.Reader, eInfos []erasureInfo, writeQuorum int) (newEInfos []erasureInfo, size int64, err error) {
// Just pick one eInfo.
eInfo := pickValidErasureInfo(eInfos)
// Allocated blockSized buffer for reading.
buf := make([]byte, eInfo.BlockSize)
hashWriters := newHashWriters(len(disks))
// Read until io.EOF, erasure codes data and writes to all disks.
for {
var n int
var blocks [][]byte
n, err = io.ReadFull(data, buf)
if err == io.EOF {
// We have reached EOF on the first byte read, io.Reader
// must be 0bytes, we don't need to erasure code
// data. Will create a 0byte file instead.
if size == 0 {
blocks = make([][]byte, len(disks))
err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum)
if err != nil {
return nil, 0, err
}
} // else we have reached EOF after few reads, no need to
// add an additional 0bytes at the end.
break
}
if err != nil && err != io.ErrUnexpectedEOF {
return nil, 0, err
}
size += int64(n)
// Returns encoded blocks.
var enErr error
blocks, enErr = encodeData(buf[:n], eInfo.DataBlocks, eInfo.ParityBlocks)
if enErr != nil {
return nil, 0, enErr
}
// Write to all disks.
err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum)
if err != nil {
return nil, 0, err
}
}
// Save the checksums.
checkSums := make([]checkSumInfo, len(disks))
for index := range disks {
blockIndex := eInfo.Distribution[index] - 1
checkSums[blockIndex] = checkSumInfo{
Name: partName,
Algorithm: "blake2b",
Hash: hex.EncodeToString(hashWriters[blockIndex].Sum(nil)),
}
}
// Erasure info update for checksum for each disks.
newEInfos = make([]erasureInfo, len(disks))
for index, eInfo := range eInfos {
if eInfo.IsValid() {
blockIndex := eInfo.Distribution[index] - 1
newEInfos[index] = eInfo
newEInfos[index].Checksum = append(newEInfos[index].Checksum, checkSums[blockIndex])
}
}
// Return newEInfos.
return newEInfos, size, nil
}
// encodeData - encodes incoming data buffer into
// dataBlocks+parityBlocks returns a 2 dimensional byte array.
func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, error) {
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
if err != nil {
return nil, err
}
// Split the input buffer into data and parity blocks.
var blocks [][]byte
blocks, err = rs.Split(dataBuffer)
if err != nil {
return nil, err
}
// Encode parity blocks using data blocks.
err = rs.Encode(blocks)
if err != nil {
return nil, err
}
// Return encoded blocks.
return blocks, nil
}
// appendFile - append data buffer at path.
func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, distribution []int, hashWriters []hash.Hash, writeQuorum int) (err error) {
var wg = &sync.WaitGroup{}
var wErrs = make([]error, len(disks))
// Write encoded data to quorum disks in parallel.
for index, disk := range disks {
if disk == nil {
continue
}
wg.Add(1)
// Write encoded data in routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
// Pick the block from the distribution.
blockIndex := distribution[index] - 1
wErr := disk.AppendFile(volume, path, enBlocks[blockIndex])
if wErr != nil {
wErrs[index] = wErr
return
}
// Calculate hash for each blocks.
hashWriters[blockIndex].Write(enBlocks[blockIndex])
// Successfully wrote.
wErrs[index] = nil
}(index, disk)
}
// Wait for all the appends to finish.
wg.Wait()
// Do we have write quorum?.
if !isQuorum(wErrs, writeQuorum) {
return toObjectErr(errXLWriteQuorum, volume, path)
}
return nil
}