Improving EncoderStream to return error only upon non io.EOF.

io.EOF is okay since io.ReadFull will not have read any bytes at all.

Also making error channel receive only for go routine.
master
Harshavardhana 9 years ago
parent 4ac23d747c
commit e082f26e10
  1. 7
      pkg/donut/bucket.go
  2. 2
      pkg/donut/split/.gitignore
  3. 179
      pkg/donut/split/split.go
  4. 72
      pkg/donut/split/split_test.go
  5. 1000
      pkg/donut/split/testdata/TESTFILE
  6. 12
      pkg/erasure/erasure_encode.go

@ -447,14 +447,13 @@ func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData
}
for blockIndex, block := range encodedBlocks {
errCh := make(chan error, 1)
go func(writer io.Writer, reader io.Reader) {
// FIXME: this closes the errCh in the outer scope
go func(writer io.Writer, reader io.Reader, errCh chan<- error) {
defer close(errCh)
_, err := io.Copy(writer, reader)
errCh <- err
}(writers[blockIndex], bytes.NewReader(block))
}(writers[blockIndex], bytes.NewReader(block), errCh)
if err := <-errCh; err != nil {
// FIXME: fix premature return in case of err != nil
// Returning error is fine here CleanupErrors() would cleanup writers
return 0, 0, iodine.New(err, nil)
}
}

@ -1,2 +0,0 @@
TESTPREFIX.*
NEWFILE

@ -1,179 +0,0 @@
/*
* Minio Cloud Storage, (C) 2014 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package split
import (
"bufio"
"bytes"
"errors"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
"github.com/minio/minio/pkg/iodine"
)
// Message - message structure for results from the Stream goroutine
type Message struct {
Data []byte
Err error
}
// Stream reads from io.Reader, splits the data into chunks, and sends
// each chunk to the channel. This method runs until an EOF or error occurs. If
// an error occurs, the method sends the error over the channel and returns.
// Before returning, the channel is always closed.
//
// The user should run this as a gorountine and retrieve the data over the
// channel.
//
// channel := make(chan Message)
// go Stream(reader, chunkSize, channel)
// for chunk := range channel {
// log.Println(chunk.Data)
// }
func Stream(reader io.Reader, chunkSize uint64) <-chan Message {
ch := make(chan Message)
go splitStreamGoRoutine(reader, chunkSize, ch)
return ch
}
func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan<- Message) {
defer close(ch)
// we read until EOF or another error
var readError error
// run this until an EOF or error occurs
for readError == nil {
// keep track of how much data has been read
var totalRead uint64
// Create a buffer to write the current chunk into
var bytesBuffer bytes.Buffer
bytesWriter := bufio.NewWriter(&bytesBuffer)
// read a full chunk
for totalRead < chunkSize && readError == nil {
var currentRead int
// if we didn't read a full chunk, we should attempt to read again.
// We create a byte array representing how much space is left
// unwritten in the given chunk
chunk := make([]byte, chunkSize-totalRead)
currentRead, readError = reader.Read(chunk)
// keep track of how much we have read in total
totalRead = totalRead + uint64(currentRead)
// prune the array to only what has been read, write to chunk buffer
chunk = chunk[0:currentRead]
bytesWriter.Write(chunk)
}
// flush stream to underlying byte buffer
bytesWriter.Flush()
// if we have data available, send it over the channel
if bytesBuffer.Len() != 0 {
ch <- Message{bytesBuffer.Bytes(), nil}
}
}
// if we have an error other than an EOF, send it over the channel
if readError != io.EOF {
ch <- Message{nil, readError}
}
}
// JoinFiles reads from a given directory, joins data in chunks with prefix and sends
// an io.Reader.
//
// var err error
// for err == nil {
// buf := make([]byte, 1024*1024)
// reader := JoinFiles("mydirectory", "mypreferred-prefix")
// _, err = reader.Read(buf)
// fmt.Println(buf)
// }
//
func JoinFiles(dirname string, inputPrefix string) io.Reader {
reader, writer := io.Pipe()
fileInfos, readError := ioutil.ReadDir(dirname)
if readError != nil {
writer.CloseWithError(readError)
return nil
}
var newfileInfos []os.FileInfo
for _, fi := range fileInfos {
if strings.Contains(fi.Name(), inputPrefix) == true {
newfileInfos = append(newfileInfos, fi)
}
}
if len(newfileInfos) == 0 {
nofilesError := iodine.New(errors.New("no files found for given prefix "+inputPrefix), nil)
writer.CloseWithError(nofilesError)
return nil
}
go joinFilesInGoRoutine(newfileInfos, writer)
return reader
}
func joinFilesInGoRoutine(fileInfos []os.FileInfo, writer *io.PipeWriter) {
for _, fileInfo := range fileInfos {
file, err := os.Open(fileInfo.Name())
defer file.Close()
for err != nil {
writer.CloseWithError(err)
return
}
_, err = io.Copy(writer, file)
if err != nil {
writer.CloseWithError(err)
return
}
}
writer.Close()
}
// FileWithPrefix - Takes a file and splits it into chunks with size chunkSize. The output
// filename is given with outputPrefix.
func FileWithPrefix(filename string, chunkSize uint64, outputPrefix string) error {
// open file
file, err := os.Open(filename)
defer file.Close()
if err != nil {
return err
}
if outputPrefix == "" {
return errors.New("Invalid argument outputPrefix cannot be empty string")
}
// used to write each chunk out as a separate file. {{outputPrefix}}.{{i}}
i := 0
// write each chunk out to a separate file
for chunk := range Stream(file, chunkSize) {
if chunk.Err != nil {
return chunk.Err
}
err := ioutil.WriteFile(outputPrefix+"."+strconv.Itoa(i), chunk.Data, 0600)
if err != nil {
return err
}
i = i + 1
}
return nil
}

@ -1,72 +0,0 @@
/*
* Minio Cloud Storage, (C) 2014 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package split_test
import (
"bufio"
"bytes"
"io"
"os"
"strconv"
"testing"
"github.com/minio/minio/pkg/donut/split"
. "gopkg.in/check.v1"
)
type MySuite struct{}
var _ = Suite(&MySuite{})
func Test(t *testing.T) { TestingT(t) }
func (s *MySuite) TestSplitStream(c *C) {
var bytesBuffer bytes.Buffer
bytesWriter := bufio.NewWriter(&bytesBuffer)
for i := 0; i < 100; i++ {
bytesWriter.Write([]byte(strconv.Itoa(i)))
}
bytesWriter.Flush()
reader := bytes.NewReader(bytesBuffer.Bytes())
ch := split.Stream(reader, 25)
var resultsBuffer bytes.Buffer
resultsWriter := bufio.NewWriter(&resultsBuffer)
for chunk := range ch {
resultsWriter.Write(chunk.Data)
}
resultsWriter.Flush()
c.Assert(bytes.Compare(bytesBuffer.Bytes(), resultsBuffer.Bytes()), Equals, 0)
}
func (s *MySuite) TestFileSplitJoin(c *C) {
err := split.FileWithPrefix("testdata/TESTFILE", 1024, "TESTPREFIX")
c.Assert(err, IsNil)
err = split.FileWithPrefix("testdata/TESTFILE", 1024, "")
c.Assert(err, Not(IsNil))
devnull, err := os.OpenFile(os.DevNull, 2, os.ModeAppend)
defer devnull.Close()
var reader io.Reader
reader = split.JoinFiles(".", "ERROR")
c.Assert(reader, IsNil)
reader = split.JoinFiles(".", "TESTPREFIX")
c.Assert(reader, Not(IsNil))
_, err = io.Copy(devnull, reader)
c.Assert(err, IsNil)
}

File diff suppressed because it is too large Load Diff

@ -171,7 +171,7 @@ func (e *Erasure) Encode(inputData []byte) (encodedBlocks [][]byte, err error) {
// Allocate memory to the "encoded blocks" return buffer
encodedBlocks = make([][]byte, n) // Return buffer
// Nessary to bridge Go to the C world. C requires 2D arry of pointers to
// Neccessary to bridge Go to the C world. C requires 2D arry of pointers to
// byte array. "encodedBlocks" is a 2D slice.
pointersToEncodedBlock := make([]*byte, n) // Pointers to encoded blocks.
@ -211,17 +211,23 @@ func (e *Erasure) EncodeStream(data io.Reader, size int64) ([][]byte, []byte, er
// Length of total number of "n" data chunks
encodedDataBlocksLen := encodedBlockLen * n
// allocate byte array for encodedBlock length
inputData := make([]byte, size, encodedDataBlocksLen)
_, err := io.ReadFull(data, inputData)
if err != nil {
return nil, nil, err
// do not check for io.ErrUnexpectedEOF, we know the right amount of size
// to be read if its a short read we need to throw error since reader could
// have been prematurely closed.
if err != io.EOF {
return nil, nil, err
}
}
// Allocate memory to the "encoded blocks" return buffer
encodedBlocks := make([][]byte, n) // Return buffer
// Nessary to bridge Go to the C world. C requires 2D arry of pointers to
// Neccessary to bridge Go to the C world. C requires 2D arry of pointers to
// byte array. "encodedBlocks" is a 2D slice.
pointersToEncodedBlock := make([]*byte, n) // Pointers to encoded blocks.

Loading…
Cancel
Save