You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
minio/cmd/erasure-createfile.go

146 lines
4.2 KiB

/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"encoding/hex"
"hash"
"io"
"sync"
"github.com/klauspost/reedsolomon"
)
// erasureCreateFile - writes an entire stream by erasure coding to
// all the disks, writes also calculate individual block's checksum
// for future bit-rot protection.
func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, allowEmpty bool, blockSize int64, dataBlocks int, parityBlocks int, algo string, writeQuorum int) (bytesWritten int64, checkSums []string, err error) {
// Allocated blockSized buffer for reading from incoming stream.
buf := make([]byte, blockSize)
hashWriters := newHashWriters(len(disks), algo)
// Read until io.EOF, erasure codes data and writes to all disks.
for {
var blocks [][]byte
n, rErr := io.ReadFull(reader, buf)
// FIXME: this is a bug in Golang, n == 0 and err ==
// io.ErrUnexpectedEOF for io.ReadFull function.
if n == 0 && rErr == io.ErrUnexpectedEOF {
return 0, nil, traceError(rErr)
}
if rErr == io.EOF {
// We have reached EOF on the first byte read, io.Reader
// must be 0bytes, we don't need to erasure code
// data. Will create a 0byte file instead.
if bytesWritten == 0 && allowEmpty {
blocks = make([][]byte, len(disks))
rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum)
if rErr != nil {
return 0, nil, rErr
}
} // else we have reached EOF after few reads, no need to
// add an additional 0bytes at the end.
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
return 0, nil, traceError(rErr)
}
if n > 0 {
// Returns encoded blocks.
var enErr error
blocks, enErr = encodeData(buf[0:n], dataBlocks, parityBlocks)
if enErr != nil {
return 0, nil, enErr
}
// Write to all disks.
if err = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum); err != nil {
return 0, nil, err
}
bytesWritten += int64(n)
}
}
checkSums = make([]string, len(disks))
for i := range checkSums {
checkSums[i] = hex.EncodeToString(hashWriters[i].Sum(nil))
}
return bytesWritten, checkSums, nil
}
// encodeData - encodes incoming data buffer into
// dataBlocks+parityBlocks returns a 2 dimensional byte array.
func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, error) {
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
if err != nil {
return nil, traceError(err)
}
// Split the input buffer into data and parity blocks.
var blocks [][]byte
blocks, err = rs.Split(dataBuffer)
if err != nil {
return nil, traceError(err)
}
// Encode parity blocks using data blocks.
err = rs.Encode(blocks)
if err != nil {
return nil, traceError(err)
}
// Return encoded blocks.
return blocks, nil
}
// appendFile - append data buffer at path.
func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hashWriters []hash.Hash, writeQuorum int) (err error) {
var wg = &sync.WaitGroup{}
var wErrs = make([]error, len(disks))
// Write encoded data to quorum disks in parallel.
for index, disk := range disks {
if disk == nil {
continue
}
wg.Add(1)
// Write encoded data in routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
wErr := disk.AppendFile(volume, path, enBlocks[index])
if wErr != nil {
wErrs[index] = traceError(wErr)
return
}
// Calculate hash for each blocks.
hashWriters[index].Write(enBlocks[index])
// Successfully wrote.
wErrs[index] = nil
}(index, disk)
}
// Wait for all the appends to finish.
wg.Wait()
// Do we have write quorum?.
if !isDiskQuorum(wErrs, writeQuorum) {
return traceError(errXLWriteQuorum)
}
return reduceWriteQuorumErrs(wErrs, objectOpIgnoredErrs, writeQuorum)
}