diff --git a/pkg/donut/bucket.go b/pkg/donut/bucket.go index a6c0ff999..fa8079505 100644 --- a/pkg/donut/bucket.go +++ b/pkg/donut/bucket.go @@ -447,14 +447,13 @@ func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData } for blockIndex, block := range encodedBlocks { errCh := make(chan error, 1) - go func(writer io.Writer, reader io.Reader) { - // FIXME: this closes the errCh in the outer scope + go func(writer io.Writer, reader io.Reader, errCh chan<- error) { defer close(errCh) _, err := io.Copy(writer, reader) errCh <- err - }(writers[blockIndex], bytes.NewReader(block)) + }(writers[blockIndex], bytes.NewReader(block), errCh) if err := <-errCh; err != nil { - // FIXME: fix premature return in case of err != nil + // Returning error is fine here CleanupErrors() would cleanup writers return 0, 0, iodine.New(err, nil) } } diff --git a/pkg/donut/split/.gitignore b/pkg/donut/split/.gitignore deleted file mode 100644 index 40a13b57c..000000000 --- a/pkg/donut/split/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -TESTPREFIX.* -NEWFILE diff --git a/pkg/donut/split/split.go b/pkg/donut/split/split.go deleted file mode 100644 index 19d82ffe6..000000000 --- a/pkg/donut/split/split.go +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2014 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package split - -import ( - "bufio" - "bytes" - "errors" - "io" - "io/ioutil" - "os" - "strconv" - "strings" - - "github.com/minio/minio/pkg/iodine" -) - -// Message - message structure for results from the Stream goroutine -type Message struct { - Data []byte - Err error -} - -// Stream reads from io.Reader, splits the data into chunks, and sends -// each chunk to the channel. This method runs until an EOF or error occurs. If -// an error occurs, the method sends the error over the channel and returns. -// Before returning, the channel is always closed. -// -// The user should run this as a gorountine and retrieve the data over the -// channel. -// -// channel := make(chan Message) -// go Stream(reader, chunkSize, channel) -// for chunk := range channel { -// log.Println(chunk.Data) -// } -func Stream(reader io.Reader, chunkSize uint64) <-chan Message { - ch := make(chan Message) - go splitStreamGoRoutine(reader, chunkSize, ch) - return ch -} - -func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan<- Message) { - defer close(ch) - - // we read until EOF or another error - var readError error - - // run this until an EOF or error occurs - for readError == nil { - // keep track of how much data has been read - var totalRead uint64 - // Create a buffer to write the current chunk into - var bytesBuffer bytes.Buffer - bytesWriter := bufio.NewWriter(&bytesBuffer) - // read a full chunk - for totalRead < chunkSize && readError == nil { - var currentRead int - // if we didn't read a full chunk, we should attempt to read again. - // We create a byte array representing how much space is left - // unwritten in the given chunk - chunk := make([]byte, chunkSize-totalRead) - currentRead, readError = reader.Read(chunk) - // keep track of how much we have read in total - totalRead = totalRead + uint64(currentRead) - // prune the array to only what has been read, write to chunk buffer - chunk = chunk[0:currentRead] - bytesWriter.Write(chunk) - } - // flush stream to underlying byte buffer - bytesWriter.Flush() - // if we have data available, send it over the channel - if bytesBuffer.Len() != 0 { - ch <- Message{bytesBuffer.Bytes(), nil} - } - } - // if we have an error other than an EOF, send it over the channel - if readError != io.EOF { - ch <- Message{nil, readError} - } -} - -// JoinFiles reads from a given directory, joins data in chunks with prefix and sends -// an io.Reader. -// -// var err error -// for err == nil { -// buf := make([]byte, 1024*1024) -// reader := JoinFiles("mydirectory", "mypreferred-prefix") -// _, err = reader.Read(buf) -// fmt.Println(buf) -// } -// -func JoinFiles(dirname string, inputPrefix string) io.Reader { - reader, writer := io.Pipe() - fileInfos, readError := ioutil.ReadDir(dirname) - if readError != nil { - writer.CloseWithError(readError) - return nil - } - - var newfileInfos []os.FileInfo - for _, fi := range fileInfos { - if strings.Contains(fi.Name(), inputPrefix) == true { - newfileInfos = append(newfileInfos, fi) - } - } - - if len(newfileInfos) == 0 { - nofilesError := iodine.New(errors.New("no files found for given prefix "+inputPrefix), nil) - writer.CloseWithError(nofilesError) - return nil - } - - go joinFilesInGoRoutine(newfileInfos, writer) - return reader -} - -func joinFilesInGoRoutine(fileInfos []os.FileInfo, writer *io.PipeWriter) { - for _, fileInfo := range fileInfos { - file, err := os.Open(fileInfo.Name()) - defer file.Close() - for err != nil { - writer.CloseWithError(err) - return - } - _, err = io.Copy(writer, file) - if err != nil { - writer.CloseWithError(err) - return - } - } - writer.Close() -} - -// FileWithPrefix - Takes a file and splits it into chunks with size chunkSize. The output -// filename is given with outputPrefix. -func FileWithPrefix(filename string, chunkSize uint64, outputPrefix string) error { - // open file - file, err := os.Open(filename) - defer file.Close() - if err != nil { - return err - } - - if outputPrefix == "" { - return errors.New("Invalid argument outputPrefix cannot be empty string") - } - - // used to write each chunk out as a separate file. {{outputPrefix}}.{{i}} - i := 0 - - // write each chunk out to a separate file - for chunk := range Stream(file, chunkSize) { - if chunk.Err != nil { - return chunk.Err - } - err := ioutil.WriteFile(outputPrefix+"."+strconv.Itoa(i), chunk.Data, 0600) - if err != nil { - return err - } - i = i + 1 - } - return nil -} diff --git a/pkg/donut/split/split_test.go b/pkg/donut/split/split_test.go deleted file mode 100644 index 7fa87bbd4..000000000 --- a/pkg/donut/split/split_test.go +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2014 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package split_test - -import ( - "bufio" - "bytes" - "io" - "os" - "strconv" - "testing" - - "github.com/minio/minio/pkg/donut/split" - . "gopkg.in/check.v1" -) - -type MySuite struct{} - -var _ = Suite(&MySuite{}) - -func Test(t *testing.T) { TestingT(t) } - -func (s *MySuite) TestSplitStream(c *C) { - var bytesBuffer bytes.Buffer - bytesWriter := bufio.NewWriter(&bytesBuffer) - for i := 0; i < 100; i++ { - bytesWriter.Write([]byte(strconv.Itoa(i))) - } - bytesWriter.Flush() - reader := bytes.NewReader(bytesBuffer.Bytes()) - ch := split.Stream(reader, 25) - var resultsBuffer bytes.Buffer - resultsWriter := bufio.NewWriter(&resultsBuffer) - for chunk := range ch { - resultsWriter.Write(chunk.Data) - } - resultsWriter.Flush() - c.Assert(bytes.Compare(bytesBuffer.Bytes(), resultsBuffer.Bytes()), Equals, 0) -} - -func (s *MySuite) TestFileSplitJoin(c *C) { - err := split.FileWithPrefix("testdata/TESTFILE", 1024, "TESTPREFIX") - c.Assert(err, IsNil) - err = split.FileWithPrefix("testdata/TESTFILE", 1024, "") - c.Assert(err, Not(IsNil)) - - devnull, err := os.OpenFile(os.DevNull, 2, os.ModeAppend) - defer devnull.Close() - - var reader io.Reader - reader = split.JoinFiles(".", "ERROR") - c.Assert(reader, IsNil) - - reader = split.JoinFiles(".", "TESTPREFIX") - c.Assert(reader, Not(IsNil)) - _, err = io.Copy(devnull, reader) - c.Assert(err, IsNil) -} diff --git a/pkg/donut/split/testdata/TESTFILE b/pkg/donut/split/testdata/TESTFILE deleted file mode 100644 index 117982456..000000000 --- a/pkg/donut/split/testdata/TESTFILE +++ /dev/null @@ -1,1000 +0,0 @@ -1 -2 -3 -4 -5 -6 -7 -8 -9 -10 -11 -12 -13 -14 -15 -16 -17 -18 -19 -20 -21 -22 -23 -24 -25 -26 -27 -28 -29 -30 -31 -32 -33 -34 -35 -36 -37 -38 -39 -40 -41 -42 -43 -44 -45 -46 -47 -48 -49 -50 -51 -52 -53 -54 -55 -56 -57 -58 -59 -60 -61 -62 -63 -64 -65 -66 -67 -68 -69 -70 -71 -72 -73 -74 -75 -76 -77 -78 -79 -80 -81 -82 -83 -84 -85 -86 -87 -88 -89 -90 -91 -92 -93 -94 -95 -96 -97 -98 -99 -100 -101 -102 -103 -104 -105 -106 -107 -108 -109 -110 -111 -112 -113 -114 -115 -116 -117 -118 -119 -120 -121 -122 -123 -124 -125 -126 -127 -128 -129 -130 -131 -132 -133 -134 -135 -136 -137 -138 -139 -140 -141 -142 -143 -144 -145 -146 -147 -148 -149 -150 -151 -152 -153 -154 -155 -156 -157 -158 -159 -160 -161 -162 -163 -164 -165 -166 -167 -168 -169 -170 -171 -172 -173 -174 -175 -176 -177 -178 -179 -180 -181 -182 -183 -184 -185 -186 -187 -188 -189 -190 -191 -192 -193 -194 -195 -196 -197 -198 -199 -200 -201 -202 -203 -204 -205 -206 -207 -208 -209 -210 -211 -212 -213 -214 -215 -216 -217 -218 -219 -220 -221 -222 -223 -224 -225 -226 -227 -228 -229 -230 -231 -232 -233 -234 -235 -236 -237 -238 -239 -240 -241 -242 -243 -244 -245 -246 -247 -248 -249 -250 -251 -252 -253 -254 -255 -256 -257 -258 -259 -260 -261 -262 -263 -264 -265 -266 -267 -268 -269 -270 -271 -272 -273 -274 -275 -276 -277 -278 -279 -280 -281 -282 -283 -284 -285 -286 -287 -288 -289 -290 -291 -292 -293 -294 -295 -296 -297 -298 -299 -300 -301 -302 -303 -304 -305 -306 -307 -308 -309 -310 -311 -312 -313 -314 -315 -316 -317 -318 -319 -320 -321 -322 -323 -324 -325 -326 -327 -328 -329 -330 -331 -332 -333 -334 -335 -336 -337 -338 -339 -340 -341 -342 -343 -344 -345 -346 -347 -348 -349 -350 -351 -352 -353 -354 -355 -356 -357 -358 -359 -360 -361 -362 -363 -364 -365 -366 -367 -368 -369 -370 -371 -372 -373 -374 -375 -376 -377 -378 -379 -380 -381 -382 -383 -384 -385 -386 -387 -388 -389 -390 -391 -392 -393 -394 -395 -396 -397 -398 -399 -400 -401 -402 -403 -404 -405 -406 -407 -408 -409 -410 -411 -412 -413 -414 -415 -416 -417 -418 -419 -420 -421 -422 -423 -424 -425 -426 -427 -428 -429 -430 -431 -432 -433 -434 -435 -436 -437 -438 -439 -440 -441 -442 -443 -444 -445 -446 -447 -448 -449 -450 -451 -452 -453 -454 -455 -456 -457 -458 -459 -460 -461 -462 -463 -464 -465 -466 -467 -468 -469 -470 -471 -472 -473 -474 -475 -476 -477 -478 -479 -480 -481 -482 -483 -484 -485 -486 -487 -488 -489 -490 -491 -492 -493 -494 -495 -496 -497 -498 -499 -500 -501 -502 -503 -504 -505 -506 -507 -508 -509 -510 -511 -512 -513 -514 -515 -516 -517 -518 -519 -520 -521 -522 -523 -524 -525 -526 -527 -528 -529 -530 -531 -532 -533 -534 -535 -536 -537 -538 -539 -540 -541 -542 -543 -544 -545 -546 -547 -548 -549 -550 -551 -552 -553 -554 -555 -556 -557 -558 -559 -560 -561 -562 -563 -564 -565 -566 -567 -568 -569 -570 -571 -572 -573 -574 -575 -576 -577 -578 -579 -580 -581 -582 -583 -584 -585 -586 -587 -588 -589 -590 -591 -592 -593 -594 -595 -596 -597 -598 -599 -600 -601 -602 -603 -604 -605 -606 -607 -608 -609 -610 -611 -612 -613 -614 -615 -616 -617 -618 -619 -620 -621 -622 -623 -624 -625 -626 -627 -628 -629 -630 -631 -632 -633 -634 -635 -636 -637 -638 -639 -640 -641 -642 -643 -644 -645 -646 -647 -648 -649 -650 -651 -652 -653 -654 -655 -656 -657 -658 -659 -660 -661 -662 -663 -664 -665 -666 -667 -668 -669 -670 -671 -672 -673 -674 -675 -676 -677 -678 -679 -680 -681 -682 -683 -684 -685 -686 -687 -688 -689 -690 -691 -692 -693 -694 -695 -696 -697 -698 -699 -700 -701 -702 -703 -704 -705 -706 -707 -708 -709 -710 -711 -712 -713 -714 -715 -716 -717 -718 -719 -720 -721 -722 -723 -724 -725 -726 -727 -728 -729 -730 -731 -732 -733 -734 -735 -736 -737 -738 -739 -740 -741 -742 -743 -744 -745 -746 -747 -748 -749 -750 -751 -752 -753 -754 -755 -756 -757 -758 -759 -760 -761 -762 -763 -764 -765 -766 -767 -768 -769 -770 -771 -772 -773 -774 -775 -776 -777 -778 -779 -780 -781 -782 -783 -784 -785 -786 -787 -788 -789 -790 -791 -792 -793 -794 -795 -796 -797 -798 -799 -800 -801 -802 -803 -804 -805 -806 -807 -808 -809 -810 -811 -812 -813 -814 -815 -816 -817 -818 -819 -820 -821 -822 -823 -824 -825 -826 -827 -828 -829 -830 -831 -832 -833 -834 -835 -836 -837 -838 -839 -840 -841 -842 -843 -844 -845 -846 -847 -848 -849 -850 -851 -852 -853 -854 -855 -856 -857 -858 -859 -860 -861 -862 -863 -864 -865 -866 -867 -868 -869 -870 -871 -872 -873 -874 -875 -876 -877 -878 -879 -880 -881 -882 -883 -884 -885 -886 -887 -888 -889 -890 -891 -892 -893 -894 -895 -896 -897 -898 -899 -900 -901 -902 -903 -904 -905 -906 -907 -908 -909 -910 -911 -912 -913 -914 -915 -916 -917 -918 -919 -920 -921 -922 -923 -924 -925 -926 -927 -928 -929 -930 -931 -932 -933 -934 -935 -936 -937 -938 -939 -940 -941 -942 -943 -944 -945 -946 -947 -948 -949 -950 -951 -952 -953 -954 -955 -956 -957 -958 -959 -960 -961 -962 -963 -964 -965 -966 -967 -968 -969 -970 -971 -972 -973 -974 -975 -976 -977 -978 -979 -980 -981 -982 -983 -984 -985 -986 -987 -988 -989 -990 -991 -992 -993 -994 -995 -996 -997 -998 -999 -1000 diff --git a/pkg/erasure/erasure_encode.go b/pkg/erasure/erasure_encode.go index c17991b43..64233455d 100644 --- a/pkg/erasure/erasure_encode.go +++ b/pkg/erasure/erasure_encode.go @@ -171,7 +171,7 @@ func (e *Erasure) Encode(inputData []byte) (encodedBlocks [][]byte, err error) { // Allocate memory to the "encoded blocks" return buffer encodedBlocks = make([][]byte, n) // Return buffer - // Nessary to bridge Go to the C world. C requires 2D arry of pointers to + // Neccessary to bridge Go to the C world. C requires 2D arry of pointers to // byte array. "encodedBlocks" is a 2D slice. pointersToEncodedBlock := make([]*byte, n) // Pointers to encoded blocks. @@ -211,17 +211,23 @@ func (e *Erasure) EncodeStream(data io.Reader, size int64) ([][]byte, []byte, er // Length of total number of "n" data chunks encodedDataBlocksLen := encodedBlockLen * n + // allocate byte array for encodedBlock length inputData := make([]byte, size, encodedDataBlocksLen) _, err := io.ReadFull(data, inputData) if err != nil { - return nil, nil, err + // do not check for io.ErrUnexpectedEOF, we know the right amount of size + // to be read if its a short read we need to throw error since reader could + // have been prematurely closed. + if err != io.EOF { + return nil, nil, err + } } // Allocate memory to the "encoded blocks" return buffer encodedBlocks := make([][]byte, n) // Return buffer - // Nessary to bridge Go to the C world. C requires 2D arry of pointers to + // Neccessary to bridge Go to the C world. C requires 2D arry of pointers to // byte array. "encodedBlocks" is a 2D slice. pointersToEncodedBlock := make([]*byte, n) // Pointers to encoded blocks.