diff --git a/erasure-readfile.go b/erasure-readfile.go index 91f494db8..722c09c7e 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -17,6 +17,7 @@ package main import ( + "bytes" "encoding/hex" "errors" "io" @@ -25,6 +26,59 @@ import ( "github.com/klauspost/reedsolomon" ) +// isSuccessDecodeBlocks - do we have all the blocks to be successfully decoded?. +// input disks here are expected to be ordered i.e parityBlocks +// are preceded by dataBlocks. For for information look at getOrderedDisks(). +func isSuccessDecodeBlocks(disks []StorageAPI, dataBlocks int) bool { + // Count number of data and parity blocks that were read. + var successDataBlocksCount = 0 + var successParityBlocksCount = 0 + for index, disk := range disks { + if disk == nil { + continue + } + if index < dataBlocks { + successDataBlocksCount++ + continue + } + successParityBlocksCount++ + } + // Returns true if we have atleast dataBlocks + 1 parity. + return successDataBlocksCount+successParityBlocksCount >= dataBlocks+1 +} + +// isSuccessDataBlocks - do we have all the data blocks? +// input disks here are expected to be ordered i.e parityBlocks +// are preceded by dataBlocks. For for information look at getOrderedDisks(). +func isSuccessDataBlocks(disks []StorageAPI, dataBlocks int) bool { + // Count number of data blocks that were read. + var successDataBlocksCount = 0 + for index, disk := range disks[:dataBlocks] { + if disk == nil { + continue + } + if index < dataBlocks { + successDataBlocksCount++ + } + } + // Returns true if we have all the dataBlocks. + return successDataBlocksCount >= dataBlocks +} + +// getOrderedDisks - get ordered disks from erasure distribution. +// returns ordered slice of disks from their actual distribution. +func getOrderedDisks(distribution []int, disks []StorageAPI, blockCheckSums []checkSumInfo) (orderedDisks []StorageAPI, orderedBlockCheckSums []checkSumInfo) { + orderedDisks = make([]StorageAPI, len(disks)) + orderedBlockCheckSums = make([]checkSumInfo, len(disks)) + // From disks gets ordered disks. + for index := range disks { + blockIndex := distribution[index] + orderedDisks[blockIndex-1] = disks[index] + orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index] + } + return orderedDisks, orderedBlockCheckSums +} + // erasureReadFile - read bytes from erasure coded files and writes to given writer. // Erasure coded files are read block by block as per given erasureInfo and data chunks // are decoded into a data block. Data block is trimmed for given offset and length, @@ -36,26 +90,24 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // Gather previously calculated block checksums. blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) - orderedBlockCheckSums := make([]checkSumInfo, len(disks)) - // []orderedDisks will have first eInfo.DataBlocks disks as data disks and rest will be parity. - orderedDisks := make([]StorageAPI, len(disks)) - for index := range disks { - blockIndex := eInfo.Distribution[index] - orderedDisks[blockIndex-1] = disks[index] - orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index] - } + // []orderedDisks will have first eInfo.DataBlocks disks as data + // disks and rest will be parity. + orderedDisks, orderedBlockCheckSums := getOrderedDisks(eInfo.Distribution, disks, blockCheckSums) - // bitrotVerify verifies if the file on a particular disk does not have bitrot by verifying the hash of - // the contents of the file. + // bitrotVerify verifies if the file on a particular disk doesn't have bitrot + // by verifying the hash of the contents of the file. bitrotVerify := func() func(diskIndex int) bool { verified := make([]bool, len(orderedDisks)) - // Return closure so that we have reference to []verified and not recalculate the hash on it - // everytime the function is called for the same disk. + // Return closure so that we have reference to []verified and + // not recalculate the hash on it everytime the function is + // called for the same disk. return func(diskIndex int) bool { if verified[diskIndex] { + // Already validated. return true } + // Is this a valid block? isValid := isValidBlock(orderedDisks[diskIndex], volume, path, orderedBlockCheckSums[diskIndex]) verified[diskIndex] = isValid return isValid @@ -65,128 +117,166 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // Total bytes written to writer bytesWritten := int64(0) + // Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. + enBlocks := make([][]byte, len(orderedDisks)) + // chunkSize is roughly BlockSize/DataBlocks. // chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes. // So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by // DataBlocks. The extra space will have 0-padding. chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks) + // Get start and end block, also bytes to be skipped based on the input offset. startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, eInfo.BlockSize) // For each block, read chunk from each disk. If we are able to read all the data disks then we don't // need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number // of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer. for block := startBlock; bytesWritten < length; block++ { - // curChunkSize will be chunkSize except for the last block because the size of the last block - // can be less than BlockSize. + // curChunkSize is chunkSize until end block. curChunkSize := chunkSize if block == endBlock && (totalLength%eInfo.BlockSize != 0) { // If this is the last block and size of the block is < BlockSize. curChunkSize = getEncodedBlockLen(totalLength%eInfo.BlockSize, eInfo.DataBlocks) } - // Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. - enBlocks := make([][]byte, len(disks)) + // Block offset. + // NOTE: That for the offset calculation we have to use chunkSize and + // not curChunkSize. If we use curChunkSize for offset calculation + // then it can result in wrong offset for the last block. + blockOffset := block * chunkSize // Figure out the number of disks that are needed for the read. // We will need DataBlocks number of disks if all the data disks are up. // We will need DataBlocks+1 number of disks even if one of the data disks is down. - diskCount := 0 + readableDiskCount := 0 + // Count the number of data disks that are up. for _, disk := range orderedDisks[:eInfo.DataBlocks] { if disk == nil { continue } - diskCount++ + readableDiskCount++ } - if diskCount < eInfo.DataBlocks { - // Not enough data disks up, so we need DataBlocks+1 number of disks for reed-solomon Reconstruct() - diskCount = eInfo.DataBlocks + 1 + // Readable disks.. + if readableDiskCount < eInfo.DataBlocks { + // Not enough data disks up, so we need DataBlocks+1 number + // of disks for reed-solomon Reconstruct() + readableDiskCount = eInfo.DataBlocks + 1 } - wg := &sync.WaitGroup{} + // Initialize wait group. + var wg = &sync.WaitGroup{} - // current disk index from which to read, this will be used later in case one of the parallel reads fails. + // Current disk index from which to read, this will be used later + // in case one of the parallel reads fails. index := 0 + // Read from the disks in parallel. for _, disk := range orderedDisks { if disk == nil { index++ continue } + + // Increment wait group. wg.Add(1) + + // Start reading from disk in a go-routine. go func(index int, disk StorageAPI) { defer wg.Done() - ok := bitrotVerify(index) - if !ok { + + // Verify bit rot for this disk slice. + if !bitrotVerify(index) { // So that we don't read from this disk for the next block. orderedDisks[index] = nil return } - buf := make([]byte, curChunkSize) - // Note that for the offset calculation we have to use chunkSize and not - // curChunkSize. If we use curChunkSize for offset calculation then it - // can result in wrong offset for the last block. - n, err := disk.ReadFile(volume, path, block*chunkSize, buf) + + // Chunk writer. + chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize)) + + // CopyN copies until current chunk size. + err := copyN(chunkWriter, disk, volume, path, blockOffset, curChunkSize) if err != nil { // So that we don't read from this disk for the next block. orderedDisks[index] = nil return } - enBlocks[index] = buf[:n] + + // Copy the read blocks. + enBlocks[index] = chunkWriter.Bytes() + + // Reset the buffer. + chunkWriter.Reset() + + // Successfully read. }(index, disk) + index++ - diskCount-- - if diskCount == 0 { + readableDiskCount-- + // We have read all the readable disks. + if readableDiskCount == 0 { break } } + + // Wait for all the reads to finish. wg.Wait() - // Count number of data and parity blocks that were read. - var successDataBlocksCount = 0 - var successParityBlocksCount = 0 - for bufidx, buf := range enBlocks { - if buf == nil { - continue - } - if bufidx < eInfo.DataBlocks { - successDataBlocksCount++ - continue - } - successParityBlocksCount++ - } + // FIXME: make this parallel. - if successDataBlocksCount < eInfo.DataBlocks { - // If we don't have DataBlocks number of data blocks we will have to read enough - // parity blocks such that we have DataBlocks+1 number for blocks for reedsolomon.Reconstruct() + // If we have all the data blocks no need to decode. + if !isSuccessDataBlocks(orderedDisks, eInfo.DataBlocks) { + // If we don't have DataBlocks number of data blocks we + // will have to read enough parity blocks such that we + // have DataBlocks+1 number for blocks for rs.Reconstruct(). + // index is either dataBlocks or dataBlocks + 1. for ; index < len(orderedDisks); index++ { - if (successDataBlocksCount + successParityBlocksCount) == (eInfo.DataBlocks + 1) { - // We have DataBlocks+1 blocks, enough for reedsolomon.Reconstruct() + // We have enough blocks to decode, break out. + if isSuccessDecodeBlocks(orderedDisks, eInfo.DataBlocks) { + // We have DataBlocks+1 blocks, enough for rs.Reconstruct() break } - ok := bitrotVerify(index) - if !ok { + + // This disk was previously set to nil and ignored, do not read again. + if orderedDisks[index] == nil { + continue + } + + // Verify bit-rot for this index. + if !bitrotVerify(index) { // Mark nil so that we don't read from this disk for the next block. orderedDisks[index] = nil continue } - buf := make([]byte, curChunkSize) - n, err := orderedDisks[index].ReadFile(volume, path, block*chunkSize, buf) + + // Chunk writer. + chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize)) + + // CopyN copies until current chunk size. + err := copyN(chunkWriter, orderedDisks[index], volume, path, blockOffset, curChunkSize) if err != nil { - // Mark nil so that we don't read from this disk for the next block. + // ERROR: Mark nil so that we don't read from + // this disk for the next block. orderedDisks[index] = nil continue } - successParityBlocksCount++ - enBlocks[index] = buf[:n] + + // Copy the read blocks. + chunkWriter.Read(enBlocks[index]) + + // Reset the buffer. + chunkWriter.Reset() } + // Reconstruct the missing data blocks. err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks) if err != nil { return bytesWritten, err } + // Success. } var outSize, outOffset int64 @@ -209,14 +299,18 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // We should not send more data than what was requested. outSize = length - bytesWritten } + // Write data blocks. n, err := writeDataBlocks(writer, enBlocks, eInfo.DataBlocks, outOffset, outSize) if err != nil { return bytesWritten, err } + + // Update total bytes written. bytesWritten += n } + // Success. return bytesWritten, nil } @@ -273,14 +367,18 @@ func isValidBlock(disk StorageAPI, volume, path string, blockCheckSum checkSumIn // decodeData - decode encoded blocks. func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error { + // Initialized reedsolomon. rs, err := reedsolomon.New(dataBlocks, parityBlocks) if err != nil { return err } + + // Reconstruct encoded blocks. err = rs.Reconstruct(enBlocks) if err != nil { return err } + // Verify reconstructed blocks (parity). ok, err := rs.Verify(enBlocks) if err != nil { @@ -291,5 +389,7 @@ func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error { err = errors.New("Verification failed after reconstruction, data likely corrupted.") return err } + + // Success. return nil } diff --git a/erasure-utils.go b/erasure-utils.go index fec8e0d02..c5b45d204 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -18,6 +18,7 @@ package main import ( "bytes" + "errors" "hash" "io" @@ -46,21 +47,17 @@ func newHash(algo string) hash.Hash { } } +// hashSum calculates the hash of the entire path and returns. func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) { - startOffset := int64(0) - // Read until io.EOF. - for { - buf := make([]byte, blockSizeV1) - n, err := disk.ReadFile(volume, path, startOffset, buf) - if err == io.EOF { - break - } - if err != nil { - return nil, err - } - writer.Write(buf[:n]) - startOffset += n + // Allocate staging buffer of 128KiB for copyBuffer. + buf := make([]byte, 128*1024) + + // Copy entire buffer to writer. + if err := copyBuffer(writer, disk, volume, path, buf); err != nil { + return nil, err } + + // Return the final hash sum. return writer.Sum(nil), nil } @@ -149,3 +146,86 @@ func getEncodedBlockLen(inputLen int64, dataBlocks int) (curEncBlockSize int64) curEncBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks) return curEncBlockSize } + +// copyN - copies from disk, volume, path to input writer until length +// is reached at volume, path or an error occurs. A success copyN returns +// err == nil, not err == EOF. Additionally offset can be provided to start +// the read at. copyN returns io.EOF if there aren't enough data to be read. +func copyN(writer io.Writer, disk StorageAPI, volume string, path string, offset int64, length int64) (err error) { + // Use 128KiB staging buffer to read upto length. + buf := make([]byte, 128*1024) + + // Read into writer until length. + for length > 0 { + nr, er := disk.ReadFile(volume, path, offset, buf) + if nr > 0 { + nw, ew := writer.Write(buf[0:nr]) + if nw > 0 { + // Decrement the length. + length -= int64(nw) + + // Progress the offset. + offset += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != int64(nw) { + err = io.ErrShortWrite + break + } + } + if er == io.EOF || er == io.ErrUnexpectedEOF { + break + } + if er != nil { + err = er + } + } + + // Success. + return err +} + +// copyBuffer - copies from disk, volume, path to input writer until either EOF +// is reached at volume, path or an error occurs. A success copyBuffer returns +// err == nil, not err == EOF. Because copyBuffer is defined to read from path +// until EOF. It does not treat an EOF from ReadFile an error to be reported. +// Additionally copyBuffer stages through the provided buffer; otherwise if it +// has zero length, returns error. +func copyBuffer(writer io.Writer, disk StorageAPI, volume string, path string, buf []byte) error { + // Error condition of zero length buffer. + if buf != nil && len(buf) == 0 { + return errors.New("empty buffer in readBuffer") + } + + // Starting offset for Reading the file. + startOffset := int64(0) + + // Read until io.EOF. + for { + n, err := disk.ReadFile(volume, path, startOffset, buf) + if n > 0 { + var m int + m, err = writer.Write(buf[:n]) + if err != nil { + return err + } + if int64(m) != n { + return io.ErrShortWrite + } + } + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + return err + } + // Progress the offset. + startOffset += n + } + + // Success. + return nil +} diff --git a/format-config-v1.go b/format-config-v1.go index f33c31606..32c0d9aa5 100644 --- a/format-config-v1.go +++ b/format-config-v1.go @@ -17,6 +17,7 @@ package main import ( + "bytes" "encoding/json" "errors" "fmt" @@ -346,9 +347,10 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1) // loadFormat - loads format.json from disk. func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) { - var buffer []byte - buffer, err = readAll(disk, minioMetaBucket, formatConfigFile) - if err != nil { + // Allocate staging buffer of 32KiB for copyBuffer. + buf := make([]byte, 32*1024) + var buffer = new(bytes.Buffer) + if err = copyBuffer(buffer, disk, minioMetaBucket, formatConfigFile, buf); err != nil { // 'file not found' and 'volume not found' as // same. 'volume not found' usually means its a fresh disk. if err == errFileNotFound || err == errVolumeNotFound { @@ -366,11 +368,15 @@ func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) { } return nil, err } + + // Try to decode format json into formatConfigV1 struct. format = &formatConfigV1{} - err = json.Unmarshal(buffer, format) - if err != nil { + d := json.NewDecoder(buffer) + if err = d.Decode(format); err != nil { return nil, err } + + // Success. return format, nil } diff --git a/fs-v1-metadata.go b/fs-v1-metadata.go index a45c6c28e..902b5fb1d 100644 --- a/fs-v1-metadata.go +++ b/fs-v1-metadata.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "encoding/json" "path" "sort" @@ -57,16 +58,23 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin } // readFSMetadata - returns the object metadata `fs.json` content. -func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) { - var buffer []byte - buffer, err = readAll(fs.storage, bucket, path.Join(object, fsMetaJSONFile)) - if err != nil { +func readFSMetadata(disk StorageAPI, bucket, object string) (fsMeta fsMetaV1, err error) { + // 32KiB staging buffer for copying `fs.json`. + var buf = make([]byte, 32*1024) + + // `fs.json` writer. + var buffer = new(bytes.Buffer) + if err = copyBuffer(buffer, disk, bucket, path.Join(object, fsMetaJSONFile), buf); err != nil { return fsMetaV1{}, err } - err = json.Unmarshal(buffer, &fsMeta) - if err != nil { + + // Decode `fs.json` into fsMeta structure. + d := json.NewDecoder(buffer) + if err = d.Decode(&fsMeta); err != nil { return fsMetaV1{}, err } + + // Success. return fsMeta, nil } diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index 6e9fadc87..de8cd5f13 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -299,7 +299,8 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Initialize md5 writer. md5Writer := md5.New() - var buf = make([]byte, blockSizeV1) + // Allocate 32KiB buffer for staging buffer. + var buf = make([]byte, 128*1024) for { n, err := io.ReadFull(data, buf) if err == io.EOF { @@ -331,7 +332,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s return "", InvalidUploadID{UploadID: uploadID} } - fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath) + fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath) if err != nil { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } @@ -367,7 +368,7 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM result := ListPartsInfo{} uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath) + fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath) if err != nil { return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath) } @@ -475,7 +476,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } // Read saved fs metadata for ongoing multipart. - fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath) + fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath) if err != nil { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } @@ -487,7 +488,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } tempObj := path.Join(tmpMetaPrefix, uploadID, "object1") - var buffer = make([]byte, blockSizeV1) + + // Allocate 32KiB buffer for staging buffer. + var buf = make([]byte, 128*1024) // Loop through all parts, validate them and then commit to disk. for i, part := range parts { @@ -509,16 +512,21 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload totalLeft := fsMeta.Parts[partIdx].Size for totalLeft > 0 { var n int64 - n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buffer) + n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buf) + if n > 0 { + if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n]); err != nil { + return "", toObjectErr(err, minioMetaBucket, tempObj) + } + } if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } if err == errFileNotFound { return "", InvalidPart{} } return "", toObjectErr(err, minioMetaBucket, multipartPartFile) } - if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buffer[:n]); err != nil { - return "", toObjectErr(err, minioMetaBucket, tempObj) - } offset += n totalLeft -= n } diff --git a/fs-v1.go b/fs-v1.go index 477ad6702..24c8f7c6e 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -17,8 +17,10 @@ package main import ( + "bytes" "crypto/md5" "encoding/hex" + "encoding/json" "io" "os" "path" @@ -45,8 +47,26 @@ func initFormatFS(storageDisk StorageAPI) error { } // loads format.json from minioMetaBucket if it exists. -func loadFormatFS(storageDisk StorageAPI) ([]byte, error) { - return readAll(storageDisk, minioMetaBucket, fsFormatJSONFile) +func loadFormatFS(storageDisk StorageAPI) (format formatConfigV1, err error) { + // Allocate 32k buffer, this is sufficient for the most of `format.json`. + buf := make([]byte, 32*1024) + + // Allocate a new `format.json` buffer writer. + var buffer = new(bytes.Buffer) + + // Reads entire `format.json`. + if err = copyBuffer(buffer, storageDisk, minioMetaBucket, fsFormatJSONFile, buf); err != nil { + return formatConfigV1{}, err + } + + // Unmarshal format config. + d := json.NewDecoder(buffer) + if err = d.Decode(&format); err != nil { + return formatConfigV1{}, err + } + + // Return structured `format.json`. + return format, nil } // Should be called when process shuts down. @@ -74,6 +94,7 @@ func newFSObjects(disk string) (ObjectLayer, error) { // Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc. fsHouseKeeping(storage) + // loading format.json from minioMetaBucket. // Note: The format.json content is ignored, reserved for future use. _, err = loadFormatFS(storage) @@ -88,10 +109,12 @@ func newFSObjects(disk string) (ObjectLayer, error) { return nil, err } } + // Register the callback that should be called when the process shuts down. registerShutdown(func() { shutdownFS(storage) }) + // Return successfully initialized object layer. return fsObjects{ storage: storage, @@ -178,7 +201,7 @@ func (fs fsObjects) DeleteBucket(bucket string) error { /// Object Operations // GetObject - get an object. -func (fs fsObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error { +func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} @@ -188,26 +211,28 @@ func (fs fsObjects) GetObject(bucket, object string, startOffset int64, length i return ObjectNameInvalid{Bucket: bucket, Object: object} } var totalLeft = length + buf := make([]byte, 32*1024) // Allocate a 32KiB staging buffer. for totalLeft > 0 { - // Figure out the right blockSize as it was encoded before. - var curBlockSize int64 + // Figure out the right size for the buffer. + var curSize int64 if blockSizeV1 < totalLeft { - curBlockSize = blockSizeV1 + curSize = blockSizeV1 } else { - curBlockSize = totalLeft + curSize = totalLeft } - buf := make([]byte, curBlockSize) - n, err := fs.storage.ReadFile(bucket, object, startOffset, buf) + // Reads the file at offset. + n, err := fs.storage.ReadFile(bucket, object, offset, buf[:curSize]) if err != nil { return toObjectErr(err, bucket, object) } - _, err = writer.Write(buf[:n]) + // Write to response writer. + m, err := writer.Write(buf[:n]) if err != nil { return toObjectErr(err, bucket, object) } - totalLeft -= n - startOffset += n - } + totalLeft -= int64(m) + offset += int64(m) + } // Success. return nil } @@ -276,7 +301,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. } } else { // Allocate a buffer to Read() the object upload stream. - buf := make([]byte, blockSizeV1) + buf := make([]byte, 32*1024) // Read the buffer till io.EOF and append the read data to // the temporary file. for { diff --git a/posix.go b/posix.go index 226dfcff8..d58652f98 100644 --- a/posix.go +++ b/posix.go @@ -418,14 +418,9 @@ func (s *posix) ReadFile(volume string, path string, offset int64, buf []byte) ( // Close the reader. defer file.Close() - // Read file. + // Read full until buffer. m, err := io.ReadFull(file, buf) - // Error unexpected is valid, set this back to nil. - if err == io.ErrUnexpectedEOF { - err = nil - } - // Success. return int64(m), err } diff --git a/xl-v1-healing.go b/xl-v1-healing.go index 188acee51..470d67ce7 100644 --- a/xl-v1-healing.go +++ b/xl-v1-healing.go @@ -16,11 +16,7 @@ package main -import ( - "encoding/json" - "path" - "sync" -) +import "sync" // Get the highest integer from a given integer slice. func highestInt(intSlice []int64, highestInt int64) (highestInteger int64) { @@ -54,28 +50,23 @@ func listObjectVersions(partsMetadata []xlMetaV1, errs []error) (versions []int6 func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []error) { errs := make([]error, len(xl.storageDisks)) metadataArray := make([]xlMetaV1, len(xl.storageDisks)) - xlMetaPath := path.Join(object, xlMetaJSONFile) var wg = &sync.WaitGroup{} + // Read `xl.json` parallelly across disks. for index, disk := range xl.storageDisks { if disk == nil { errs[index] = errDiskNotFound continue } wg.Add(1) + // Read `xl.json` in routine. go func(index int, disk StorageAPI) { defer wg.Done() - buffer, err := readAll(disk, bucket, xlMetaPath) + var err error + metadataArray[index], err = readXLMeta(disk, bucket, object) if err != nil { errs[index] = err return } - err = json.Unmarshal(buffer, &metadataArray[index]) - if err != nil { - // Unable to parse xl.json, set error. - errs[index] = err - return - } - errs[index] = nil }(index, disk) } diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 15bc1e79c..a061bc0ef 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -201,8 +201,7 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err if disk == nil { continue } - var buf []byte - buf, err = readAll(disk, bucket, path.Join(object, xlMetaJSONFile)) + xlMeta, err = readXLMeta(disk, bucket, object) if err != nil { // For any reason disk is not available continue and read from other disks. if err == errDiskNotFound || err == errFaultyDisk { @@ -210,10 +209,6 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err } return xlMetaV1{}, err } - err = json.Unmarshal(buf, &xlMeta) - if err != nil { - return xlMetaV1{}, err - } break } return xlMeta, nil diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index f3aed0e02..1ece15f80 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -17,6 +17,7 @@ package main import ( + "bytes" "encoding/json" "path" "sort" @@ -69,16 +70,25 @@ func (u uploadsV1) Index(uploadID string) int { // readUploadsJSON - get all the saved uploads JSON. func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV1, err error) { + // Staging buffer of 128KiB kept for reading `uploads.json`. + var buf = make([]byte, 128*1024) + + // Writer holding `uploads.json` content. + var buffer = new(bytes.Buffer) + uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) - // Read all of 'uploads.json' - buffer, rErr := readAll(disk, minioMetaBucket, uploadJSONPath) - if rErr != nil { - return uploadsV1{}, rErr + // Reads entire `uploads.json`. + if err = copyBuffer(buffer, disk, minioMetaBucket, uploadJSONPath, buf); err != nil { + return uploadsV1{}, err } - rErr = json.Unmarshal(buffer, &uploadIDs) - if rErr != nil { - return uploadsV1{}, rErr + + // Decode `uploads.json`. + d := json.NewDecoder(buffer) + if err = d.Decode(&uploadIDs); err != nil { + return uploadsV1{}, err } + + // Success. return uploadIDs, nil } diff --git a/xl-v1-utils.go b/xl-v1-utils.go index b102548dd..394eccfa9 100644 --- a/xl-v1-utils.go +++ b/xl-v1-utils.go @@ -18,7 +18,7 @@ package main import ( "bytes" - "io" + "encoding/json" "math/rand" "path" "time" @@ -64,48 +64,25 @@ func randInts(count int) []int { return ints } -// readAll - returns contents from volume/path as byte array. -func readAll(disk StorageAPI, volume string, path string) ([]byte, error) { - var writer = new(bytes.Buffer) - startOffset := int64(0) +// readXLMeta reads `xl.json` returns contents as byte array. +func readXLMeta(disk StorageAPI, bucket string, object string) (xlMeta xlMetaV1, err error) { + // Allocate 32k buffer, this is sufficient for the most of `xl.json`. + buf := make([]byte, 128*1024) - // Allocate 10MiB buffer. - buf := make([]byte, blockSizeV1) + // Allocate a new `xl.json` buffer writer. + var buffer = new(bytes.Buffer) - // Read until io.EOF. - for { - n, err := disk.ReadFile(volume, path, startOffset, buf) - if err == io.EOF { - break - } - if err != nil && err != io.EOF { - return nil, err - } - writer.Write(buf[:n]) - startOffset += n + // Reads entire `xl.json`. + if err = copyBuffer(buffer, disk, bucket, path.Join(object, xlMetaJSONFile), buf); err != nil { + return xlMetaV1{}, err } - return writer.Bytes(), nil -} - -// readXLMeta reads `xl.json` returns contents as byte array. -func readXLMeta(disk StorageAPI, bucket string, object string) ([]byte, error) { - var writer = new(bytes.Buffer) - startOffset := int64(0) - - // Allocate 2MiB buffer, this is sufficient for the most of `xl.json`. - buf := make([]byte, 2*1024*1024) - // Read until io.EOF. - for { - n, err := disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), startOffset, buf) - if err == io.EOF { - break - } - if err != nil && err != io.EOF { - return nil, err - } - writer.Write(buf[:n]) - startOffset += n + // Unmarshal xl metadata. + d := json.NewDecoder(buffer) + if err = d.Decode(&xlMeta); err != nil { + return xlMetaV1{}, err } - return writer.Bytes(), nil + + // Return structured `xl.json`. + return xlMeta, nil }