XL: Implement new ReadAll API for files which are read in single call. (#1974)

Add a unit test as well.
master
Harshavardhana 9 years ago committed by Anand Babu (AB) Periasamy
parent ed2fdd90b0
commit 42286cba70
  1. 10
      format-config-v1.go
  2. 15
      fs-v1-metadata.go
  3. 13
      fs-v1.go
  4. 65
      posix.go
  5. 92
      posix_test.go
  6. 14
      rpc-client.go
  7. 9
      rpc-server-datatypes.go
  8. 10
      rpc-server.go
  9. 3
      storage-interface.go
  10. 13
      xl-v1-multipart-common.go
  11. 15
      xl-v1-utils.go

@ -17,7 +17,6 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -347,10 +346,8 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1)
// loadFormat - loads format.json from disk. // loadFormat - loads format.json from disk.
func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) { func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) {
// Allocate staging buffer of 32KiB for copyBuffer. buf, err := disk.ReadAll(minioMetaBucket, formatConfigFile)
buf := make([]byte, 32*1024) if err != nil {
var buffer = new(bytes.Buffer)
if err = copyBuffer(buffer, disk, minioMetaBucket, formatConfigFile, buf); err != nil {
// 'file not found' and 'volume not found' as // 'file not found' and 'volume not found' as
// same. 'volume not found' usually means its a fresh disk. // same. 'volume not found' usually means its a fresh disk.
if err == errFileNotFound || err == errVolumeNotFound { if err == errFileNotFound || err == errVolumeNotFound {
@ -371,8 +368,7 @@ func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) {
// Try to decode format json into formatConfigV1 struct. // Try to decode format json into formatConfigV1 struct.
format = &formatConfigV1{} format = &formatConfigV1{}
d := json.NewDecoder(buffer) if err = json.Unmarshal(buf, format); err != nil {
if err = d.Decode(format); err != nil {
return nil, err return nil, err
} }

@ -1,7 +1,6 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"path" "path"
"sort" "sort"
@ -59,18 +58,14 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin
// readFSMetadata - returns the object metadata `fs.json` content. // readFSMetadata - returns the object metadata `fs.json` content.
func readFSMetadata(disk StorageAPI, bucket, object string) (fsMeta fsMetaV1, err error) { func readFSMetadata(disk StorageAPI, bucket, object string) (fsMeta fsMetaV1, err error) {
// 32KiB staging buffer for copying `fs.json`. // Read all `fs.json`.
var buf = make([]byte, 32*1024) buf, err := disk.ReadAll(bucket, path.Join(object, fsMetaJSONFile))
if err != nil {
// `fs.json` writer.
var buffer = new(bytes.Buffer)
if err = copyBuffer(buffer, disk, bucket, path.Join(object, fsMetaJSONFile), buf); err != nil {
return fsMetaV1{}, err return fsMetaV1{}, err
} }
// Decode `fs.json` into fsMeta structure. // Decode `fs.json` into fsMeta structure.
d := json.NewDecoder(buffer) if err = json.Unmarshal(buf, &fsMeta); err != nil {
if err = d.Decode(&fsMeta); err != nil {
return fsMetaV1{}, err return fsMetaV1{}, err
} }

@ -17,7 +17,6 @@
package main package main
import ( import (
"bytes"
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
@ -48,20 +47,14 @@ func initFormatFS(storageDisk StorageAPI) error {
// loads format.json from minioMetaBucket if it exists. // loads format.json from minioMetaBucket if it exists.
func loadFormatFS(storageDisk StorageAPI) (format formatConfigV1, err error) { func loadFormatFS(storageDisk StorageAPI) (format formatConfigV1, err error) {
// Allocate 32k buffer, this is sufficient for the most of `format.json`.
buf := make([]byte, 32*1024)
// Allocate a new `format.json` buffer writer.
var buffer = new(bytes.Buffer)
// Reads entire `format.json`. // Reads entire `format.json`.
if err = copyBuffer(buffer, storageDisk, minioMetaBucket, fsFormatJSONFile, buf); err != nil { buf, err := storageDisk.ReadAll(minioMetaBucket, fsFormatJSONFile)
if err != nil {
return formatConfigV1{}, err return formatConfigV1{}, err
} }
// Unmarshal format config. // Unmarshal format config.
d := json.NewDecoder(buffer) if err = json.Unmarshal(buf, &format); err != nil {
if err = d.Decode(&format); err != nil {
return formatConfigV1{}, err return formatConfigV1{}, err
} }

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"io" "io"
"io/ioutil"
"os" "os"
slashpath "path" slashpath "path"
"path/filepath" "path/filepath"
@ -352,6 +353,67 @@ func (s *posix) ListDir(volume, dirPath string) (entries []string, err error) {
return readDir(pathJoin(volumeDir, dirPath)) return readDir(pathJoin(volumeDir, dirPath))
} }
// ReadAll reads from r until an error or EOF and returns the data it read.
// A successful call returns err == nil, not err == EOF. Because ReadAll is
// defined to read from src until EOF, it does not treat an EOF from Read
// as an error to be reported.
// This API is meant to be used on files which have small memory footprint, do
// not use this on large files as it would cause server to crash.
func (s *posix) ReadAll(volume, path string) (buf []byte, err error) {
defer func() {
if err == syscall.EIO {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if s.ioErrCount > maxAllowedIOError {
return nil, errFaultyDisk
}
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return nil, err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat(preparePath(volumeDir))
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
}
return nil, err
}
// Validate file path length, before reading.
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
return nil, err
}
// Open the file for reading.
buf, err = ioutil.ReadFile(preparePath(filePath))
if err != nil {
if os.IsNotExist(err) {
return nil, errFileNotFound
} else if os.IsPermission(err) {
return nil, errFileAccessDenied
} else if pathErr, ok := err.(*os.PathError); ok {
if pathErr.Err == syscall.EISDIR {
return nil, errFileNotFound
} else if strings.Contains(pathErr.Err.Error(), "The handle is invalid") {
// This case is special and needs to be handled for windows.
return nil, errFileNotFound
}
return nil, pathErr
}
return nil, err
}
return buf, nil
}
// ReadFile reads exactly len(buf) bytes into buf. It returns the // ReadFile reads exactly len(buf) bytes into buf. It returns the
// number of bytes copied. The error is EOF only if no bytes were // number of bytes copied. The error is EOF only if no bytes were
// read. On return, n == len(buf) if and only if err == nil. n == 0 // read. On return, n == len(buf) if and only if err == nil. n == 0
@ -386,10 +448,13 @@ func (s *posix) ReadFile(volume string, path string, offset int64, buf []byte) (
return 0, err return 0, err
} }
// Validate effective path length before reading.
filePath := pathJoin(volumeDir, path) filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil { if err = checkPathLength(filePath); err != nil {
return 0, err return 0, err
} }
// Open the file for reading.
file, err := os.Open(preparePath(filePath)) file, err := os.Open(preparePath(filePath))
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {

@ -0,0 +1,92 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"io/ioutil"
"os"
"testing"
)
// Tests the functionality implemented by ReadAll storage API.
func TestReadAll(t *testing.T) {
path, err := ioutil.TempDir(os.TempDir(), "minio-")
if err != nil {
t.Fatalf("Unable to create a temporary directory, %s", err)
}
defer removeAll(path)
// Initialize posix storage layer.
posix, err := newPosix(path)
if err != nil {
t.Fatalf("Unable to initialize posix, %s", err)
}
// Create files for the test cases.
if err = posix.MakeVol("exists"); err != nil {
t.Fatalf("Unable to create a volume \"exists\", %s", err)
}
if err = posix.AppendFile("exists", "as-directory/as-file", []byte("Hello, World")); err != nil {
t.Fatalf("Unable to create a file \"as-directory/as-file\", %s", err)
}
if err = posix.AppendFile("exists", "as-file", []byte("Hello, World")); err != nil {
t.Fatalf("Unable to create a file \"as-file\", %s", err)
}
// Testcases to validate different conditions for ReadAll API.
testCases := []struct {
volume string
path string
err error
}{
// Validate volume does not exist.
{
"i-dont-exist",
"",
errVolumeNotFound,
},
// Validate bad condition file does not exist.
{
"exists",
"as-file-not-found",
errFileNotFound,
},
// Validate bad condition file exists as prefix/directory and
// we are attempting to read it.
{
"exists",
"as-directory",
errFileNotFound,
},
// Validate the good condition file exists and we are able to
// read it.
{
"exists",
"as-file",
nil,
},
// Add more cases here.
}
// Run through all the test cases and validate for ReadAll.
for i, testCase := range testCases {
_, err = posix.ReadAll(testCase.volume, testCase.path)
if err != testCase.err {
t.Errorf("Test %d expected err %s, got err %s", i+1, testCase.err, err)
}
}
}

@ -168,6 +168,20 @@ func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err er
return fileInfo, nil return fileInfo, nil
} }
// ReadAll - reads entire contents of the file at path until EOF, retuns the
// contents in a byte slice. Returns buf == nil if err != nil.
// This API is meant to be used on files which have small memory footprint, do
// not use this on large files as it would cause server to crash.
func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) {
if err = n.rpcClient.Call("Storage.ReadAllHandler", ReadAllArgs{
Vol: volume,
Path: path,
}, &buf); err != nil {
return nil, toStorageErr(err)
}
return buf, nil
}
// ReadFile - reads a file. // ReadFile - reads a file.
func (n networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) { func (n networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) {
if err = n.rpcClient.Call("Storage.ReadFileHandler", ReadFileArgs{ if err = n.rpcClient.Call("Storage.ReadFileHandler", ReadFileArgs{

@ -28,6 +28,15 @@ type ListVolsReply struct {
Vols []VolInfo Vols []VolInfo
} }
// ReadAllArgs represents read all RPC arguments.
type ReadAllArgs struct {
// Name of the volume.
Vol string
// Name of the path.
Path string
}
// ReadFileArgs represents read file RPC arguments. // ReadFileArgs represents read file RPC arguments.
type ReadFileArgs struct { type ReadFileArgs struct {
// Name of the volume. // Name of the volume.

@ -75,6 +75,16 @@ func (s *storageServer) ListDirHandler(arg *ListDirArgs, reply *[]string) error
return nil return nil
} }
// ReadAllHandler - read all handler is rpc wrapper to read all storage API.
func (s *storageServer) ReadAllHandler(arg *ReadFileArgs, reply *[]byte) error {
buf, err := s.storage.ReadAll(arg.Vol, arg.Path)
if err != nil {
return err
}
reply = &buf
return nil
}
// ReadFileHandler - read file handler is rpc wrapper to read file. // ReadFileHandler - read file handler is rpc wrapper to read file.
func (s *storageServer) ReadFileHandler(arg *ReadFileArgs, reply *int64) error { func (s *storageServer) ReadFileHandler(arg *ReadFileArgs, reply *int64) error {
n, err := s.storage.ReadFile(arg.Vol, arg.Path, arg.Offset, arg.Buffer) n, err := s.storage.ReadFile(arg.Vol, arg.Path, arg.Offset, arg.Buffer)

@ -31,4 +31,7 @@ type StorageAPI interface {
RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error
StatFile(volume string, path string) (file FileInfo, err error) StatFile(volume string, path string) (file FileInfo, err error)
DeleteFile(volume string, path string) (err error) DeleteFile(volume string, path string) (err error)
// Read all.
ReadAll(volume string, path string) (buf []byte, err error)
} }

@ -17,7 +17,6 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"path" "path"
"sort" "sort"
@ -70,21 +69,15 @@ func (u uploadsV1) Index(uploadID string) int {
// readUploadsJSON - get all the saved uploads JSON. // readUploadsJSON - get all the saved uploads JSON.
func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV1, err error) { func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV1, err error) {
// Staging buffer of 128KiB kept for reading `uploads.json`.
var buf = make([]byte, 128*1024)
// Writer holding `uploads.json` content.
var buffer = new(bytes.Buffer)
uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
// Reads entire `uploads.json`. // Reads entire `uploads.json`.
if err = copyBuffer(buffer, disk, minioMetaBucket, uploadJSONPath, buf); err != nil { buf, err := disk.ReadAll(minioMetaBucket, uploadJSONPath)
if err != nil {
return uploadsV1{}, err return uploadsV1{}, err
} }
// Decode `uploads.json`. // Decode `uploads.json`.
d := json.NewDecoder(buffer) if err = json.Unmarshal(buf, &uploadIDs); err != nil {
if err = d.Decode(&uploadIDs); err != nil {
return uploadsV1{}, err return uploadsV1{}, err
} }

@ -17,7 +17,6 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"math/rand" "math/rand"
"path" "path"
@ -64,22 +63,16 @@ func randInts(count int) []int {
return ints return ints
} }
// readXLMeta reads `xl.json` returns contents as byte array. // readXLMeta reads `xl.json` and returns back XL metadata structure.
func readXLMeta(disk StorageAPI, bucket string, object string) (xlMeta xlMetaV1, err error) { func readXLMeta(disk StorageAPI, bucket string, object string) (xlMeta xlMetaV1, err error) {
// Allocate 32k buffer, this is sufficient for the most of `xl.json`.
buf := make([]byte, 128*1024)
// Allocate a new `xl.json` buffer writer.
var buffer = new(bytes.Buffer)
// Reads entire `xl.json`. // Reads entire `xl.json`.
if err = copyBuffer(buffer, disk, bucket, path.Join(object, xlMetaJSONFile), buf); err != nil { buf, err := disk.ReadAll(bucket, path.Join(object, xlMetaJSONFile))
if err != nil {
return xlMetaV1{}, err return xlMetaV1{}, err
} }
// Unmarshal xl metadata. // Unmarshal xl metadata.
d := json.NewDecoder(buffer) if err = json.Unmarshal(buf, &xlMeta); err != nil {
if err = d.Decode(&xlMeta); err != nil {
return xlMetaV1{}, err return xlMetaV1{}, err
} }

Loading…
Cancel
Save