diff --git a/pkgs/split/split.go b/pkgs/split/split.go index de410d174..63cea5d10 100644 --- a/pkgs/split/split.go +++ b/pkgs/split/split.go @@ -55,7 +55,13 @@ type JoinMessage struct { // for chunk := range channel { // log.Println(chunk.Data) // } -func SplitStream(reader io.Reader, chunkSize uint64, ch chan SplitMessage) { +func SplitStream(reader io.Reader, chunkSize uint64) <-chan SplitMessage { + ch := make(chan SplitMessage) + go splitStreamGoRoutine(reader, chunkSize, ch) + return ch +} + +func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan SplitMessage) { // we read until EOF or another error var readError error @@ -95,7 +101,13 @@ func SplitStream(reader io.Reader, chunkSize uint64, ch chan SplitMessage) { close(ch) } -func JoinStream(dirname string, inputPrefix string, ch chan JoinMessage) { +func JoinStream(dirname string, inputPrefix string) <-chan JoinMessage { + ch := make(chan JoinMessage) + go joinStreamGoRoutine(dirname, inputPrefix, ch) + return ch +} + +func joinStreamGoRoutine(dirname string, inputPrefix string, ch chan JoinMessage) { var readError error var bytesBuffer bytes.Buffer @@ -147,8 +159,7 @@ func JoinFilesWithPrefix(dirname string, inputPrefix string, outputFile string) return errors.New("Invalid output file") } - ch := make(chan JoinMessage) - go JoinStream(dirname, inputPrefix, ch) + ch := JoinStream(dirname, inputPrefix) var multiReaders []io.Reader var aggregatedLength int64 @@ -194,8 +205,7 @@ func SplitFilesWithPrefix(filename string, chunkstr string, outputPrefix string) } // start stream splitting goroutine - ch := make(chan SplitMessage) - go SplitStream(file, chunkSize, ch) + ch := SplitStream(file, chunkSize) // used to write each chunk out as a separate file. {{outputPrefix}}.{{i}} i := 0 diff --git a/pkgs/split/split_test.go b/pkgs/split/split_test.go index cf0733fdf..3ccddafec 100644 --- a/pkgs/split/split_test.go +++ b/pkgs/split/split_test.go @@ -38,9 +38,8 @@ func (s *MySuite) TestSplitStream(c *C) { bytesWriter.Write([]byte(strconv.Itoa(i))) } bytesWriter.Flush() - ch := make(chan SplitMessage) reader := bytes.NewReader(bytesBuffer.Bytes()) - go SplitStream(reader, 25, ch) + ch := SplitStream(reader, 25) var resultsBuffer bytes.Buffer resultsWriter := bufio.NewWriter(&resultsBuffer) for chunk := range ch { diff --git a/pkgs/storage/encodedstorage/encoded_storage.go b/pkgs/storage/encodedstorage/encoded_storage.go index a58dc8b37..637c3754a 100644 --- a/pkgs/storage/encodedstorage/encoded_storage.go +++ b/pkgs/storage/encodedstorage/encoded_storage.go @@ -136,8 +136,7 @@ func (eStorage *encodedStorage) List(objectPath string) ([]storage.ObjectDescrip func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error { // split - chunks := make(chan split.SplitMessage) - go split.SplitStream(object, eStorage.BlockSize, chunks) + chunks := split.SplitStream(object, eStorage.BlockSize) // for each chunk encoderParameters, err := erasure.ParseEncoderParams(eStorage.K, eStorage.M, erasure.CAUCHY)