From b5d84790a2d198d86515386c271ecb40c83f7b30 Mon Sep 17 00:00:00 2001 From: "Frederick F. Kautz IV" Date: Sun, 4 Jan 2015 14:46:51 +1300 Subject: [PATCH] Further simplifying merging files. Eliminated a structure, switched to PipeWriter --- pkg/split/split.go | 91 +++++++++-------------------------------- pkg/split/split_test.go | 14 +++++-- 2 files changed, 30 insertions(+), 75 deletions(-) diff --git a/pkg/split/split.go b/pkg/split/split.go index 451429f75..5319036bc 100644 --- a/pkg/split/split.go +++ b/pkg/split/split.go @@ -22,7 +22,6 @@ import ( "bufio" "bytes" "errors" - "github.com/minio-io/minio/pkg/strbyteconv" "io" "io/ioutil" "os" @@ -101,93 +100,48 @@ func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan SplitMessa close(ch) } -func JoinStream(dirname string, inputPrefix string) <-chan JoinMessage { - ch := make(chan JoinMessage) - go joinStreamGoRoutine(dirname, inputPrefix, ch) - return ch -} - -func joinStreamGoRoutine(dirname string, inputPrefix string, ch chan JoinMessage) { - var readError error - - var bytesBuffer bytes.Buffer - bytesWriter := bufio.NewWriter(&bytesBuffer) - // read a full directory +func JoinFiles(dirname string, inputPrefix string) io.Reader { + reader, writer := io.Pipe() fileInfos, readError := ioutil.ReadDir(dirname) if readError != nil { - ch <- JoinMessage{nil, 0, readError} + writer.CloseWithError(readError) } var newfileInfos []os.FileInfo for _, fi := range fileInfos { if strings.Contains(fi.Name(), inputPrefix) == true { newfileInfos = append(newfileInfos, fi) - continue } } if len(newfileInfos) == 0 { - ch <- JoinMessage{nil, 0, errors.New("no files found for given prefix")} - } - - for i := range newfileInfos { - slice, err := ioutil.ReadFile(newfileInfos[i].Name()) - if err != nil { - ch <- JoinMessage{nil, 0, err} - } - bytesWriter.Write(slice) - bytesWriter.Flush() - if bytesBuffer.Len() != 0 { - ch <- JoinMessage{&bytesBuffer, newfileInfos[i].Size(), nil} - } + writer.CloseWithError(errors.New("no files found for given prefix")) } - // close the channel, signaling the channel reader that the stream is complete - close(ch) + go joinFilesGoRoutine(newfileInfos, writer) + return reader } -func JoinFilesWithPrefix(dirname string, inputPrefix string, outputFile string) error { - if dirname == "" { - return errors.New("Invalid directory") - } - - if inputPrefix == "" { - return errors.New("Invalid argument inputPrefix cannot be empty string") - } - - if outputFile == "" { - return errors.New("Invalid output file") - } - - ch := JoinStream(dirname, inputPrefix) - - var multiReaders []io.Reader - var aggregatedLength int64 - for output := range ch { - if output.Err != nil { - return output.Err +func joinFilesGoRoutine(fileInfos []os.FileInfo, writer *io.PipeWriter) { + for _, fileInfo := range fileInfos { + file, err := os.Open(fileInfo.Name()) + defer file.Close() + for err != nil { + writer.CloseWithError(err) + return + } + _, err = io.Copy(writer, file) + if err != nil { + writer.CloseWithError(err) + return } - multiReaders = append(multiReaders, output.Reader) - aggregatedLength += output.Length - } - - newReader := io.MultiReader(multiReaders...) - aggregatedBytes := make([]byte, aggregatedLength) - _, err := newReader.Read(aggregatedBytes) - if err != nil { - return err - } - - err = ioutil.WriteFile(outputFile, aggregatedBytes, 0600) - if err != nil { - return err } - return nil + writer.Close() } // Takes a file and splits it into chunks with size chunkSize. The output // filename is given with outputPrefix. -func SplitFilesWithPrefix(filename string, chunkstr string, outputPrefix string) error { +func SplitFileWithPrefix(filename string, chunkSize uint64, outputPrefix string) error { // open file file, err := os.Open(filename) defer file.Close() @@ -199,11 +153,6 @@ func SplitFilesWithPrefix(filename string, chunkstr string, outputPrefix string) return errors.New("Invalid argument outputPrefix cannot be empty string") } - chunkSize, err := strbyteconv.StringToBytes(chunkstr) - if err != nil { - return err - } - // start stream splitting goroutine ch := SplitStream(file, chunkSize) diff --git a/pkg/split/split_test.go b/pkg/split/split_test.go index 3ccddafec..6ef7663a0 100644 --- a/pkg/split/split_test.go +++ b/pkg/split/split_test.go @@ -19,6 +19,8 @@ package split import ( "bufio" "bytes" + "io" + "os" "strconv" "testing" @@ -50,13 +52,17 @@ func (s *MySuite) TestSplitStream(c *C) { } func (s *MySuite) TestFileSplitJoin(c *C) { - err := SplitFilesWithPrefix("TESTFILE", "1KB", "TESTPREFIX") + err := SplitFileWithPrefix("TESTFILE", 1024, "TESTPREFIX") c.Assert(err, IsNil) - err = SplitFilesWithPrefix("TESTFILE", "1KB", "") + err = SplitFileWithPrefix("TESTFILE", 1024, "") c.Assert(err, Not(IsNil)) - err = JoinFilesWithPrefix(".", "TESTPREFIX", "") + devnull, err := os.OpenFile(os.DevNull, 2, os.ModeAppend) + defer devnull.Close() + reader := JoinFiles(".", "ERROR") + _, err = io.Copy(devnull, reader) c.Assert(err, Not(IsNil)) - err = JoinFilesWithPrefix(".", "TESTPREFIX", "NEWFILE") + reader = JoinFiles(".", "TESTPREFIX") + _, err = io.Copy(devnull, reader) c.Assert(err, IsNil) }