From 89587e1391a1c0ceec37339ff35c49990005ac8e Mon Sep 17 00:00:00 2001 From: "Frederick F. Kautz IV" Date: Mon, 1 Dec 2014 18:51:47 -0800 Subject: [PATCH] Adding file splitter and streaming byte chunker --- pkgs/split/.gitignore | 3 +- pkgs/split/TESTFILE | Bin 262144 -> 3893 bytes pkgs/split/split.go | 102 ++++++++++++++++++++++++++++++++++++++- pkgs/split/split_test.go | 34 ++++++++++++- 4 files changed, 135 insertions(+), 4 deletions(-) diff --git a/pkgs/split/.gitignore b/pkgs/split/.gitignore index 411512991..5ff624766 100644 --- a/pkgs/split/.gitignore +++ b/pkgs/split/.gitignore @@ -1 +1,2 @@ -TESTFILE.* \ No newline at end of file +TESTFILE.* +TESTPREFIX.* diff --git a/pkgs/split/TESTFILE b/pkgs/split/TESTFILE index 6d23118f0d0084657a974875123ddc1b9a0738dd..1179824569dcb14413904cb2b5cb036a9551024d 100644 GIT binary patch literal 3893 zcmW;P$&te_3;@ADqcIO$`6srm{j+R&8Uo9Eopsk+v2xYwHEaLY-fN$=@7izeSUcCQ zwR`PZ=eN#Y=d5$rdF#YFxlXOq>&&{pb@#eH@~->VjdgR~TDRAo^?vK^_0D>}@vRr@ z<$ASVuQw}xEB1=B;;!&Ou_9O0ie51*e=GOOv+}NdD?M+ntd+fTR{d7(RcF;*^;X3y zKc`mps#*P8y;q;rclBExt8=wq+pA~IZ_QqF*4#C3O{~c^wMLAuh8#aVeuDfI`APEA z=l-)f_l5h)ed)e-U%ap0mruVf;8t);xHa4&ZWXtTTgNTrR&q;&1i0{9FGGU=!E~HiHde zQ`i_bhYezr*eEuO4P(>TI5v;NH`L!$e`Ed4^*7i&4v)j*@HjjUkHh2eI6MxI!{hKcJPwb;F7c1dremJc38?2p+*Bcm$8&5j=uN z@CY8kBX|Ul;1S-B;1N86NAL(9!6SGCkKhqJf=BQO9>F7c1dremJc38?2p+*Bcm$8& z5j=uN@CY8kBX|Ul;1N86NAL(9!6SGCkKhqJf=BQO9>F7c1dremJc38?2p+*BcqEVH zkvx(|@<<-ZBY7l`$s>6rkK~a&l1K7L9?2tlB#-2gJd#K9NFK=}c_feI zkvx(|@<<-ZBY7l`$s>6rkK~a&l1K7L9?2tlB#-2gJd#K9NFK=}c_feI zkvx(|@<<-ZBY7l`$s>6rkK~a&l1K7L9?2tlB#-2gJd#K9NFK=}c_feI zkvx(|@hBd}qj(gL;!!+`NAV~g#iMu>kK$20ibwG%9>t@06p!LjJc>v0C?3V5codJ~ zQ9O!A@hBd}qj(gL;!!+`NAV~g#iMu>kK$20ibwG%9>t@06p!LjJc>v0C?3V5codJ~ zQ9O!A@hBd}qj(gL;!!+`NAV~g#iMu>kK$20ibwG%9>t@06p!LjJc>v0C?3V5codJ~ zQ9O!A@hBd}qj)rr=FvQwNAqYN&7*lVkLJ-lnn&|!9?he9G>_)dJeo)IXdca@c{Gpa z(L9<*^JpH;qj@xs=FvQwNAqYN&7*lVkLJ-lnn&|!9?he9G>_)dJeo)IXdca@c{Gpa z(L9<*^JpH;qj@xs=FvQwNAqYN&7*lVkLJ-lnn&|!9?he9G>_)dJeo)IXdca@c{Gpa z(L9<*^JpH;qj@xs=FvQw$M6^)!((_1kKr*qhR5(29>ZgJ43FV4Jch^c7#_o8cnpu> zF+7IH@E9J$V|WaY;W0dh$M6^)!((_1kKr*qhR5(29>ZgJ43FV4Jch^c7#_o8cnpu> zF+7IH@E9J$V|WaY;W0dh$M6^)!((_1kKr*qhR5(29>ZgJ43FV4Jch^c7#_o8cnpu> fF+7IH@E9J$V|WaY;W0dh$M6^)!(({(KlodJo)3Cl literal 262144 zcmeIuF#!Mo0K%a4Pwj07h(KY$fB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA Wz<>b*1`HT5V8DO@0|pEj_<;ccJOBXz diff --git a/pkgs/split/split.go b/pkgs/split/split.go index 64dc7cae2..fd50359b3 100644 --- a/pkgs/split/split.go +++ b/pkgs/split/split.go @@ -25,9 +25,16 @@ package split // #include "split.h" import "C" import ( + "bufio" + "bytes" "errors" - "github.com/minio-io/minio/pkgs/strbyteconv" + "io" + "io/ioutil" + "os" + "strconv" "unsafe" + + "github.com/minio-io/minio/pkgs/strbyteconv" ) type Split struct { @@ -51,3 +58,96 @@ func (b *Split) GenChunks(bname string, bytestr string) error { } return nil } + +type GoSplit struct { + file string + offset uint64 +} + +// SplitStream reads from io.Reader, splits the data into chunks, and sends +// each chunk to the channel. This method runs until an EOF or error occurs. If +// an error occurs, the method sends the error over the channel and returns. +// Before returning, the channel is always closed. +// +// The user should run this as a gorountine and retrieve the data over the +// channel. +// +// channel := make(chan ByteMessage) +// go SplitStream(reader, chunkSize, channel) +// for chunk := range channel { +// log.Println(chunk.Data) +// } +func SplitStream(reader io.Reader, chunkSize uint64, ch chan ByteMessage) { + // we read until EOF or another error + var readError error + + // run this until an EOF or error occurs + for readError == nil { + // keep track of how much data has been read + var totalRead uint64 + // Create a buffer to write the current chunk into + var bytesBuffer bytes.Buffer + bytesWriter := bufio.NewWriter(&bytesBuffer) + // read a full chunk + for totalRead < chunkSize && readError == nil { + var currentRead int + // if we didn't read a full chunk, we should attempt to read again. + // We create a byte array representing how much space is left + // unwritten in the given chunk + chunk := make([]byte, chunkSize-totalRead) + currentRead, readError = reader.Read(chunk) + // keep track of how much we have read in total + totalRead = totalRead + uint64(currentRead) + // prune the array to only what has been read, write to chunk buffer + chunk = chunk[0:currentRead] + bytesWriter.Write(chunk) + } + // flush stream to underlying byte buffer + bytesWriter.Flush() + // if we have data available, send it over the channel + if bytesBuffer.Len() != 0 { + ch <- ByteMessage{bytesBuffer.Bytes(), nil} + } + } + // if we have an error other than an EOF, send it over the channel + if readError != io.EOF { + ch <- ByteMessage{nil, readError} + } + // close the channel, signaling the channel reader that the stream is complete + close(ch) +} + +// Message structure for results from the SplitStream goroutine +type ByteMessage struct { + Data []byte + Err error +} + +// Takes a file and splits it into chunks with size chunkSize. The output +// filename is given with outputPrefix. +func SplitFilesWithPrefix(filename string, chunkSize uint64, outputPrefix string) error { + // open file + file, err := os.Open(filename) + if err != nil { + return err + } + // start stream splitting goroutine + ch := make(chan ByteMessage) + go SplitStream(file, chunkSize, ch) + + // used to write each chunk out as a separate file. {{outputPrefix}}.{{i}} + i := 0 + + // write each chunk out to a separate file + for chunk := range ch { + if chunk.Err != nil { + return chunk.Err + } + err := ioutil.WriteFile(outputPrefix+"."+strconv.Itoa(i), chunk.Data, 0600) + if err != nil { + return err + } + i = i + 1 + } + return nil +} diff --git a/pkgs/split/split_test.go b/pkgs/split/split_test.go index 62dc28ebd..33b5dc658 100644 --- a/pkgs/split/split_test.go +++ b/pkgs/split/split_test.go @@ -17,8 +17,13 @@ package split import ( - . "gopkg.in/check.v1" + "bufio" + "bytes" + "log" + "strconv" "testing" + + . "gopkg.in/check.v1" ) type MySuite struct{} @@ -29,6 +34,31 @@ func Test(t *testing.T) { TestingT(t) } func (s *MySuite) TestFileSplit(c *C) { b := Split{} - err := b.GenChunks("TESTFILE", "20KB") + err := b.GenChunks("TESTFILE", "1KB") + c.Assert(err, IsNil) +} + +func (s *MySuite) TestSplitStream(c *C) { + var bytesBuffer bytes.Buffer + bytesWriter := bufio.NewWriter(&bytesBuffer) + for i := 0; i < 100; i++ { + bytesWriter.Write([]byte(strconv.Itoa(i))) + } + bytesWriter.Flush() + log.Println(strconv.Itoa(bytesBuffer.Len())) + ch := make(chan ByteMessage) + reader := bytes.NewReader(bytesBuffer.Bytes()) + go SplitStream(reader, 25, ch) + var resultsBuffer bytes.Buffer + resultsWriter := bufio.NewWriter(&resultsBuffer) + for chunk := range ch { + resultsWriter.Write(chunk.Data) + } + resultsWriter.Flush() + c.Assert(bytes.Compare(bytesBuffer.Bytes(), resultsBuffer.Bytes()), Equals, 0) +} + +func (s *MySuite) TestFileSplit2(c *C) { + err := SplitFilesWithPrefix("TESTFILE", 1024, "TESTPREFIX") c.Assert(err, IsNil) }