diff --git a/pkgs/split/.gitignore b/pkgs/split/.gitignore index 411512991..5ff624766 100644 --- a/pkgs/split/.gitignore +++ b/pkgs/split/.gitignore @@ -1 +1,2 @@ -TESTFILE.* \ No newline at end of file +TESTFILE.* +TESTPREFIX.* diff --git a/pkgs/split/TESTFILE b/pkgs/split/TESTFILE index 6d23118f0..117982456 100644 Binary files a/pkgs/split/TESTFILE and b/pkgs/split/TESTFILE differ diff --git a/pkgs/split/split.go b/pkgs/split/split.go index 64dc7cae2..fd50359b3 100644 --- a/pkgs/split/split.go +++ b/pkgs/split/split.go @@ -25,9 +25,16 @@ package split // #include "split.h" import "C" import ( + "bufio" + "bytes" "errors" - "github.com/minio-io/minio/pkgs/strbyteconv" + "io" + "io/ioutil" + "os" + "strconv" "unsafe" + + "github.com/minio-io/minio/pkgs/strbyteconv" ) type Split struct { @@ -51,3 +58,96 @@ func (b *Split) GenChunks(bname string, bytestr string) error { } return nil } + +type GoSplit struct { + file string + offset uint64 +} + +// SplitStream reads from io.Reader, splits the data into chunks, and sends +// each chunk to the channel. This method runs until an EOF or error occurs. If +// an error occurs, the method sends the error over the channel and returns. +// Before returning, the channel is always closed. +// +// The user should run this as a gorountine and retrieve the data over the +// channel. +// +// channel := make(chan ByteMessage) +// go SplitStream(reader, chunkSize, channel) +// for chunk := range channel { +// log.Println(chunk.Data) +// } +func SplitStream(reader io.Reader, chunkSize uint64, ch chan ByteMessage) { + // we read until EOF or another error + var readError error + + // run this until an EOF or error occurs + for readError == nil { + // keep track of how much data has been read + var totalRead uint64 + // Create a buffer to write the current chunk into + var bytesBuffer bytes.Buffer + bytesWriter := bufio.NewWriter(&bytesBuffer) + // read a full chunk + for totalRead < chunkSize && readError == nil { + var currentRead int + // if we didn't read a full chunk, we should attempt to read again. + // We create a byte array representing how much space is left + // unwritten in the given chunk + chunk := make([]byte, chunkSize-totalRead) + currentRead, readError = reader.Read(chunk) + // keep track of how much we have read in total + totalRead = totalRead + uint64(currentRead) + // prune the array to only what has been read, write to chunk buffer + chunk = chunk[0:currentRead] + bytesWriter.Write(chunk) + } + // flush stream to underlying byte buffer + bytesWriter.Flush() + // if we have data available, send it over the channel + if bytesBuffer.Len() != 0 { + ch <- ByteMessage{bytesBuffer.Bytes(), nil} + } + } + // if we have an error other than an EOF, send it over the channel + if readError != io.EOF { + ch <- ByteMessage{nil, readError} + } + // close the channel, signaling the channel reader that the stream is complete + close(ch) +} + +// Message structure for results from the SplitStream goroutine +type ByteMessage struct { + Data []byte + Err error +} + +// Takes a file and splits it into chunks with size chunkSize. The output +// filename is given with outputPrefix. +func SplitFilesWithPrefix(filename string, chunkSize uint64, outputPrefix string) error { + // open file + file, err := os.Open(filename) + if err != nil { + return err + } + // start stream splitting goroutine + ch := make(chan ByteMessage) + go SplitStream(file, chunkSize, ch) + + // used to write each chunk out as a separate file. {{outputPrefix}}.{{i}} + i := 0 + + // write each chunk out to a separate file + for chunk := range ch { + if chunk.Err != nil { + return chunk.Err + } + err := ioutil.WriteFile(outputPrefix+"."+strconv.Itoa(i), chunk.Data, 0600) + if err != nil { + return err + } + i = i + 1 + } + return nil +} diff --git a/pkgs/split/split_test.go b/pkgs/split/split_test.go index 62dc28ebd..33b5dc658 100644 --- a/pkgs/split/split_test.go +++ b/pkgs/split/split_test.go @@ -17,8 +17,13 @@ package split import ( - . "gopkg.in/check.v1" + "bufio" + "bytes" + "log" + "strconv" "testing" + + . "gopkg.in/check.v1" ) type MySuite struct{} @@ -29,6 +34,31 @@ func Test(t *testing.T) { TestingT(t) } func (s *MySuite) TestFileSplit(c *C) { b := Split{} - err := b.GenChunks("TESTFILE", "20KB") + err := b.GenChunks("TESTFILE", "1KB") + c.Assert(err, IsNil) +} + +func (s *MySuite) TestSplitStream(c *C) { + var bytesBuffer bytes.Buffer + bytesWriter := bufio.NewWriter(&bytesBuffer) + for i := 0; i < 100; i++ { + bytesWriter.Write([]byte(strconv.Itoa(i))) + } + bytesWriter.Flush() + log.Println(strconv.Itoa(bytesBuffer.Len())) + ch := make(chan ByteMessage) + reader := bytes.NewReader(bytesBuffer.Bytes()) + go SplitStream(reader, 25, ch) + var resultsBuffer bytes.Buffer + resultsWriter := bufio.NewWriter(&resultsBuffer) + for chunk := range ch { + resultsWriter.Write(chunk.Data) + } + resultsWriter.Flush() + c.Assert(bytes.Compare(bytesBuffer.Bytes(), resultsBuffer.Bytes()), Equals, 0) +} + +func (s *MySuite) TestFileSplit2(c *C) { + err := SplitFilesWithPrefix("TESTFILE", 1024, "TESTPREFIX") c.Assert(err, IsNil) }