From 9a5003dd25240fe9e6ab5a7639cc5fc85138b3d6 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 5 Dec 2014 09:49:07 -0800 Subject: [PATCH] Provide JoinFilesWithPrefix case with Gochannel --- cmd/erasure-demo/encode.go | 2 +- pkgs/split/.gitignore | 2 +- pkgs/split/Makefile | 2 +- pkgs/split/split.go | 107 ++++++++++++++++++++++++++++++++----- pkgs/split/split_test.go | 9 +++- 5 files changed, 104 insertions(+), 18 deletions(-) diff --git a/cmd/erasure-demo/encode.go b/cmd/erasure-demo/encode.go index b33f731ed..808c7811b 100644 --- a/cmd/erasure-demo/encode.go +++ b/cmd/erasure-demo/encode.go @@ -48,7 +48,7 @@ func encode(c *cli.Context) { } } else { chunkCount := 0 - splitChannel := make(chan split.ByteMessage) + splitChannel := make(chan split.SplitMessage) inputReader := bytes.NewReader(input) go split.SplitStream(inputReader, config.blockSize, splitChannel) for chunk := range splitChannel { diff --git a/pkgs/split/.gitignore b/pkgs/split/.gitignore index 5ff624766..40a13b57c 100644 --- a/pkgs/split/.gitignore +++ b/pkgs/split/.gitignore @@ -1,2 +1,2 @@ -TESTFILE.* TESTPREFIX.* +NEWFILE diff --git a/pkgs/split/Makefile b/pkgs/split/Makefile index 0f1ee2600..de67111fb 100644 --- a/pkgs/split/Makefile +++ b/pkgs/split/Makefile @@ -8,4 +8,4 @@ test: build @godep go test -race -coverprofile=cover.out clean: - @rm -fv TESTFILE.* TESTPREFIX.* cover.out + @rm -fv NEWFILE TESTPREFIX.* cover.out diff --git a/pkgs/split/split.go b/pkgs/split/split.go index f9c5bcf13..de410d174 100644 --- a/pkgs/split/split.go +++ b/pkgs/split/split.go @@ -22,25 +22,26 @@ import ( "bufio" "bytes" "errors" + "github.com/minio-io/minio/pkgs/strbyteconv" "io" "io/ioutil" "os" "strconv" - - "github.com/minio-io/minio/pkgs/strbyteconv" + "strings" ) -type Split struct { - file string - offset uint64 -} - // Message structure for results from the SplitStream goroutine -type ByteMessage struct { +type SplitMessage struct { Data []byte Err error } +type JoinMessage struct { + Reader io.Reader + Length int64 + Err error +} + // SplitStream reads from io.Reader, splits the data into chunks, and sends // each chunk to the channel. This method runs until an EOF or error occurs. If // an error occurs, the method sends the error over the channel and returns. @@ -49,12 +50,12 @@ type ByteMessage struct { // The user should run this as a gorountine and retrieve the data over the // channel. // -// channel := make(chan ByteMessage) +// channel := make(chan SplitMessage) // go SplitStream(reader, chunkSize, channel) // for chunk := range channel { // log.Println(chunk.Data) // } -func SplitStream(reader io.Reader, chunkSize uint64, ch chan ByteMessage) { +func SplitStream(reader io.Reader, chunkSize uint64, ch chan SplitMessage) { // we read until EOF or another error var readError error @@ -83,22 +84,102 @@ func SplitStream(reader io.Reader, chunkSize uint64, ch chan ByteMessage) { bytesWriter.Flush() // if we have data available, send it over the channel if bytesBuffer.Len() != 0 { - ch <- ByteMessage{bytesBuffer.Bytes(), nil} + ch <- SplitMessage{bytesBuffer.Bytes(), nil} } } // if we have an error other than an EOF, send it over the channel if readError != io.EOF { - ch <- ByteMessage{nil, readError} + ch <- SplitMessage{nil, readError} } // close the channel, signaling the channel reader that the stream is complete close(ch) } +func JoinStream(dirname string, inputPrefix string, ch chan JoinMessage) { + var readError error + + var bytesBuffer bytes.Buffer + bytesWriter := bufio.NewWriter(&bytesBuffer) + // read a full directory + fileInfos, readError := ioutil.ReadDir(dirname) + if readError != nil { + ch <- JoinMessage{nil, 0, readError} + } + + var newfileInfos []os.FileInfo + for _, fi := range fileInfos { + if strings.Contains(fi.Name(), inputPrefix) == true { + newfileInfos = append(newfileInfos, fi) + continue + } + } + + if len(newfileInfos) == 0 { + ch <- JoinMessage{nil, 0, errors.New("no files found for given prefix")} + } + + for i := range newfileInfos { + slice, err := ioutil.ReadFile(newfileInfos[i].Name()) + if err != nil { + ch <- JoinMessage{nil, 0, err} + } + bytesWriter.Write(slice) + bytesWriter.Flush() + if bytesBuffer.Len() != 0 { + ch <- JoinMessage{&bytesBuffer, newfileInfos[i].Size(), nil} + } + } + + // close the channel, signaling the channel reader that the stream is complete + close(ch) +} + +func JoinFilesWithPrefix(dirname string, inputPrefix string, outputFile string) error { + if dirname == "" { + return errors.New("Invalid directory") + } + + if inputPrefix == "" { + return errors.New("Invalid argument inputPrefix cannot be empty string") + } + + if outputFile == "" { + return errors.New("Invalid output file") + } + + ch := make(chan JoinMessage) + go JoinStream(dirname, inputPrefix, ch) + + var multiReaders []io.Reader + var aggregatedLength int64 + for output := range ch { + if output.Err != nil { + return output.Err + } + multiReaders = append(multiReaders, output.Reader) + aggregatedLength += output.Length + } + + newReader := io.MultiReader(multiReaders...) + aggregatedBytes := make([]byte, aggregatedLength) + _, err := newReader.Read(aggregatedBytes) + if err != nil { + return err + } + + err = ioutil.WriteFile(outputFile, aggregatedBytes, 0600) + if err != nil { + return err + } + return nil +} + // Takes a file and splits it into chunks with size chunkSize. The output // filename is given with outputPrefix. func SplitFilesWithPrefix(filename string, chunkstr string, outputPrefix string) error { // open file file, err := os.Open(filename) + defer file.Close() if err != nil { return err } @@ -113,7 +194,7 @@ func SplitFilesWithPrefix(filename string, chunkstr string, outputPrefix string) } // start stream splitting goroutine - ch := make(chan ByteMessage) + ch := make(chan SplitMessage) go SplitStream(file, chunkSize, ch) // used to write each chunk out as a separate file. {{outputPrefix}}.{{i}} diff --git a/pkgs/split/split_test.go b/pkgs/split/split_test.go index 37324463f..22ed1f39c 100644 --- a/pkgs/split/split_test.go +++ b/pkgs/split/split_test.go @@ -40,7 +40,7 @@ func (s *MySuite) TestSplitStream(c *C) { } bytesWriter.Flush() log.Println(strconv.Itoa(bytesBuffer.Len())) - ch := make(chan ByteMessage) + ch := make(chan SplitMessage) reader := bytes.NewReader(bytesBuffer.Bytes()) go SplitStream(reader, 25, ch) var resultsBuffer bytes.Buffer @@ -52,9 +52,14 @@ func (s *MySuite) TestSplitStream(c *C) { c.Assert(bytes.Compare(bytesBuffer.Bytes(), resultsBuffer.Bytes()), Equals, 0) } -func (s *MySuite) TestFileSplit2(c *C) { +func (s *MySuite) TestFileSplitJoin(c *C) { err := SplitFilesWithPrefix("TESTFILE", "1KB", "TESTPREFIX") c.Assert(err, IsNil) err = SplitFilesWithPrefix("TESTFILE", "1KB", "") c.Assert(err, Not(IsNil)) + + err = JoinFilesWithPrefix(".", "TESTPREFIX", "") + c.Assert(err, Not(IsNil)) + err = JoinFilesWithPrefix(".", "TESTPREFIX", "NEWFILE") + c.Assert(err, IsNil) }