xl: Change fileMetadata to xlMetadata. (#1404)

Finalized backend format

```
{
    "version": "1.0.0",
    "stat": {
        "size": 24256,
        "modTime": "2016-04-28T00:11:37.843Z"
    },
    "erasure": {
        "data": 5,
        "parity": 5,
        "blockSize": 4194304
    ],
    "minio": {
        "release": "RELEASE.2016-04-28T00-09-47Z"
    }
}
```
master
Harshavardhana 9 years ago committed by Anand Babu (AB) Periasamy
parent 41b35cff7b
commit a1a667ae5d
  1. 39
      xl-json.md
  2. 67
      xl-v1-common.go
  3. 39
      xl-v1-createfile.go
  4. 27
      xl-v1-healfile.go
  5. 108
      xl-v1-metadata.go
  6. 29
      xl-v1-readfile.go
  7. 6
      xl-v1-utils.go
  8. 91
      xl-v1.go

@ -0,0 +1,39 @@
### xl.json
``xl.json`` is a special file captured and written by XL storage API layer
to interpret, manage and extract erasured data to multiple disks.
```json
{
"version": "1.0.0",
"stat": {
"size": 24256,
"modTime": "2016-04-28T00:11:37.843Z",
"version": 0
},
"erasure": {
"data": 5,
"parity": 5,
"blockSize": 4194304
],
"minio": {
"release": "RELEASE.2016-04-28T00-09-47Z"
}
}
```
#### JSON meaning.
- "version" // Version of the meta json file.
- "stat" // Stat value of written file.
- "size" // Size of the file.
- "modTime" // Modified time of the file.
- "version" // File version tracked when disks are down.
- "erasure" // Erasure metadata for the written file.
- "data" // Data blocks parts of the file.
- "parity" // Parity blocks parts of the file.
- "blockSize" // BlockSize read/write chunk size.

@ -17,7 +17,6 @@
package main
import (
"encoding/json"
"errors"
slashpath "path"
"path/filepath"
@ -37,27 +36,11 @@ func highestInt(intSlice []int64) (highestInteger int64) {
}
// Extracts file versions from partsMetadata slice and returns version slice.
func listFileVersions(partsMetadata []fileMetadata, errs []error) (versions []int64, err error) {
func listFileVersions(partsMetadata []xlMetaV1, errs []error) (versions []int64, err error) {
versions = make([]int64, len(partsMetadata))
for index, metadata := range partsMetadata {
if errs[index] == nil {
var version int64
version, err = metadata.GetFileVersion()
if err == errMetadataKeyNotExist {
log.WithFields(logrus.Fields{
"metadata": metadata,
}).Errorf("Missing 'file.version', %s", errMetadataKeyNotExist)
versions[index] = 0
continue
}
if err != nil {
log.WithFields(logrus.Fields{
"metadata": metadata,
}).Errorf("'file.version' decoding failed with %s", err)
// Unexpected, return error.
return nil, err
}
versions[index] = version
versions[index] = metadata.Stat.Version
} else {
versions[index] = -1
}
@ -67,10 +50,10 @@ func listFileVersions(partsMetadata []fileMetadata, errs []error) (versions []in
// Returns slice of online disks needed.
// - slice returing readable disks.
// - fileMetadata
// - xlMetaV1
// - bool value indicating if healing is needed.
// - error if any.
func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mdata fileMetadata, heal bool, err error) {
func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mdata xlMetaV1, heal bool, err error) {
partsMetadata, errs := xl.getPartsMetadata(volume, path)
notFoundCount := 0
// FIXME: take care of the situation when a disk has failed and been removed
@ -82,8 +65,8 @@ func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mda
notFoundCount++
// If we have errors with file not found greater than allowed read
// quorum we return err as errFileNotFound.
if notFoundCount > xl.readQuorum {
return nil, fileMetadata{}, false, errFileNotFound
if notFoundCount > len(xl.storageDisks)-xl.readQuorum {
return nil, xlMetaV1{}, false, errFileNotFound
}
}
}
@ -96,7 +79,7 @@ func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mda
"volume": volume,
"path": path,
}).Errorf("Extracting file versions failed with %s", err)
return nil, fileMetadata{}, false, err
return nil, xlMetaV1{}, false, err
}
// Get highest file version.
@ -130,31 +113,31 @@ func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mda
"onlineDiskCount": onlineDiskCount,
"readQuorumCount": xl.readQuorum,
}).Errorf("%s", errReadQuorum)
return nil, fileMetadata{}, false, errReadQuorum
return nil, xlMetaV1{}, false, errReadQuorum
}
}
return onlineDisks, mdata, heal, nil
}
// Get parts.json metadata as a map slice.
// Get xl.json metadata as a map slice.
// Returns error slice indicating the failed metadata reads.
// Read lockNS() should be done by caller.
func (xl XL) getPartsMetadata(volume, path string) ([]fileMetadata, []error) {
func (xl XL) getPartsMetadata(volume, path string) ([]xlMetaV1, []error) {
errs := make([]error, len(xl.storageDisks))
metadataArray := make([]fileMetadata, len(xl.storageDisks))
metadataFilePath := slashpath.Join(path, metadataFile)
metadataArray := make([]xlMetaV1, len(xl.storageDisks))
xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File)
for index, disk := range xl.storageDisks {
offset := int64(0)
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
metadataReader, err := disk.ReadFile(volume, xlMetaV1FilePath, offset)
if err != nil {
errs[index] = err
continue
}
defer metadataReader.Close()
metadata, err := fileMetadataDecode(metadataReader)
metadata, err := xlMetaV1Decode(metadataReader)
if err != nil {
// Unable to parse parts.json, set error.
// Unable to parse xl.json, set error.
errs[index] = err
continue
}
@ -163,38 +146,30 @@ func (xl XL) getPartsMetadata(volume, path string) ([]fileMetadata, []error) {
return metadataArray, errs
}
// Writes/Updates `parts.json` for given file. updateParts carries
// index of disks where `parts.json` needs to be updated.
// Writes/Updates `xl.json` for given file. updateParts carries
// index of disks where `xl.json` needs to be updated.
//
// Returns collection of errors, indexed in accordance with input
// updateParts order.
// Write lockNS() should be done by caller.
func (xl XL) setPartsMetadata(volume, path string, metadata fileMetadata, updateParts []bool) []error {
metadataFilePath := filepath.Join(path, metadataFile)
func (xl XL) setPartsMetadata(volume, path string, metadata xlMetaV1, updateParts []bool) []error {
xlMetaV1FilePath := filepath.Join(path, xlMetaV1File)
errs := make([]error, len(xl.storageDisks))
for index := range updateParts {
errs[index] = errors.New("Metadata not updated")
}
metadataBytes, err := json.Marshal(metadata)
if err != nil {
for index := range updateParts {
errs[index] = err
}
return errs
}
for index, shouldUpdate := range updateParts {
if !shouldUpdate {
continue
}
writer, err := xl.storageDisks[index].CreateFile(volume, metadataFilePath)
writer, err := xl.storageDisks[index].CreateFile(volume, xlMetaV1FilePath)
errs[index] = err
if err != nil {
continue
}
_, err = writer.Write(metadataBytes)
err = metadata.Write(writer)
if err != nil {
errs[index] = err
safeCloseAndRemove(writer)

@ -17,16 +17,12 @@
package main
import (
"encoding/hex"
"fmt"
"hash"
"io"
slashpath "path"
"strconv"
"time"
"github.com/Sirupsen/logrus"
fastSha512 "github.com/minio/minio/pkg/crypto/sha512"
)
// Erasure block size.
@ -92,9 +88,8 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
higherVersion++
writers := make([]io.WriteCloser, len(xl.storageDisks))
sha512Writers := make([]hash.Hash, len(xl.storageDisks))
metadataFilePath := slashpath.Join(path, metadataFile)
xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File)
metadataWriters := make([]io.WriteCloser, len(xl.storageDisks))
// Save additional erasureMetadata.
@ -102,7 +97,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
createFileError := 0
for index, disk := range xl.storageDisks {
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
erasurePart := slashpath.Join(path, fmt.Sprintf("file.%d", index))
var writer io.WriteCloser
writer, err = disk.CreateFile(volume, erasurePart)
if err != nil {
@ -126,7 +121,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
// create meta data file
var metadataWriter io.WriteCloser
metadataWriter, err = disk.CreateFile(volume, metadataFilePath)
metadataWriter, err = disk.CreateFile(volume, xlMetaV1FilePath)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
@ -148,7 +143,6 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
writers[index] = writer
metadataWriters[index] = metadataWriter
sha512Writers[index] = fastSha512.New()
}
// Allocate 4MiB block size buffer for reading.
@ -221,9 +215,6 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
reader.CloseWithError(err)
return
}
if sha512Writers[index] != nil {
sha512Writers[index].Write(encodedData)
}
}
// Update total written.
@ -232,21 +223,19 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
}
// Initialize metadata map, save all erasure related metadata.
metadata := make(fileMetadata)
metadata.Set("version", minioVersion)
metadata.Set("format.major", "1")
metadata.Set("format.minor", "0")
metadata.Set("format.patch", "0")
metadata.Set("file.size", strconv.FormatInt(totalSize, 10))
metadata := xlMetaV1{}
metadata.Version = "1"
metadata.Stat.Size = totalSize
metadata.Stat.ModTime = modTime
metadata.Minio.Release = minioReleaseTag
if len(xl.storageDisks) > len(writers) {
// Save file.version only if we wrote to less disks than all
// storage disks.
metadata.Set("file.version", strconv.FormatInt(higherVersion, 10))
metadata.Stat.Version = higherVersion
}
metadata.Set("file.modTime", modTime.Format(timeFormatAMZ))
metadata.Set("file.xl.blockSize", strconv.Itoa(erasureBlockSize))
metadata.Set("file.xl.dataBlocks", strconv.Itoa(xl.DataBlocks))
metadata.Set("file.xl.parityBlocks", strconv.Itoa(xl.ParityBlocks))
metadata.Erasure.DataBlocks = xl.DataBlocks
metadata.Erasure.ParityBlocks = xl.ParityBlocks
metadata.Erasure.BlockSize = erasureBlockSize
// Write all the metadata.
// below case is not handled here
@ -257,10 +246,6 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
if metadataWriter == nil {
continue
}
if sha512Writers[index] != nil {
// Save sha512 checksum of each encoded blocks.
metadata.Set("file.xl.block512Sum", hex.EncodeToString(sha512Writers[index].Sum(nil)))
}
// Write metadata.
err = metadata.Write(metadataWriter)

@ -50,21 +50,12 @@ func (xl XL) healFile(volume string, path string) error {
return nil
}
size, err := metadata.GetSize()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("Failed to get file size, %s", err)
return err
}
for index, disk := range onlineDisks {
if disk == nil {
needsHeal[index] = true
continue
}
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
erasurePart := slashpath.Join(path, fmt.Sprintf("file.%d", index))
// If disk.ReadFile returns error and we don't have read quorum it will be taken care as
// ReedSolomon.Reconstruct() will fail later.
var reader io.ReadCloser
@ -93,7 +84,7 @@ func (xl XL) healFile(volume string, path string) error {
if !healNeeded {
continue
}
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
erasurePart := slashpath.Join(path, fmt.Sprintf("file.%d", index))
writers[index], err = xl.storageDisks[index].CreateFile(volume, erasurePart)
if err != nil {
log.WithFields(logrus.Fields{
@ -105,17 +96,17 @@ func (xl XL) healFile(volume string, path string) error {
return err
}
}
var totalLeft = size
var totalLeft = metadata.Stat.Size
for totalLeft > 0 {
// Figure out the right blockSize.
var curBlockSize int
if erasureBlockSize < totalLeft {
curBlockSize = erasureBlockSize
var curBlockSize int64
if metadata.Erasure.BlockSize < totalLeft {
curBlockSize = metadata.Erasure.BlockSize
} else {
curBlockSize = int(totalLeft)
curBlockSize = totalLeft
}
// Calculate the current block size.
curBlockSize = getEncodedBlockLen(curBlockSize, xl.DataBlocks)
curBlockSize = getEncodedBlockLen(curBlockSize, metadata.Erasure.DataBlocks)
enBlocks := make([][]byte, totalBlocks)
// Loop through all readers and read.
for index, reader := range readers {
@ -205,7 +196,7 @@ func (xl XL) healFile(volume string, path string) error {
return err
}
}
totalLeft = totalLeft - erasureBlockSize
totalLeft = totalLeft - metadata.Erasure.BlockSize
}
// After successful healing Close() the writer so that the temp

@ -18,53 +18,31 @@ package main
import (
"encoding/json"
"errors"
"io"
"strconv"
"time"
)
// error type when key is not found.
var errMetadataKeyNotExist = errors.New("Key not found in fileMetadata.")
// This code is built on similar ideas of http.Header.
// Ref - https://golang.org/pkg/net/http/#Header
// A fileMetadata represents a metadata header mapping
// keys to sets of values.
type fileMetadata map[string][]string
// Add adds the key, value pair to the header.
// It appends to any existing values associated with key.
func (f fileMetadata) Add(key, value string) {
f[key] = append(f[key], value)
}
// Set sets the header entries associated with key to
// the single element value. It replaces any existing
// values associated with key.
func (f fileMetadata) Set(key, value string) {
f[key] = []string{value}
}
// Get gets the first value associated with the given key.
// If there are no values associated with the key, Get returns "".
// Get is a convenience method. For more complex queries,
// access the map directly.
func (f fileMetadata) Get(key string) []string {
if f == nil {
return nil
}
v, ok := f[key]
if !ok {
return nil
// A xlMetaV1 represents a metadata header mapping keys to sets of values.
type xlMetaV1 struct {
Version string `json:"version"`
Stat struct {
Size int64 `json:"size"`
ModTime time.Time `json:"modTime"`
Version int64 `json:"version"`
} `json:"stat"`
Erasure struct {
DataBlocks int `json:"data"`
ParityBlocks int `json:"parity"`
BlockSize int64 `json:"blockSize"`
}
return v
Minio struct {
Release string `json:"release"`
} `json:"minio"`
}
// Write writes a metadata in wire format.
func (f fileMetadata) Write(writer io.Writer) error {
metadataBytes, err := json.Marshal(f)
func (m xlMetaV1) Write(writer io.Writer) error {
metadataBytes, err := json.Marshal(m)
if err != nil {
return err
}
@ -72,56 +50,12 @@ func (f fileMetadata) Write(writer io.Writer) error {
return err
}
// Get file size.
func (f fileMetadata) GetSize() (int64, error) {
sizes := f.Get("file.size")
if sizes == nil {
return 0, errMetadataKeyNotExist
}
sizeStr := sizes[0]
return strconv.ParseInt(sizeStr, 10, 64)
}
// Set file size.
func (f fileMetadata) SetSize(size int64) {
f.Set("file.size", strconv.FormatInt(size, 10))
}
// Get file Modification time.
func (f fileMetadata) GetModTime() (time.Time, error) {
timeStrs := f.Get("file.modTime")
if timeStrs == nil {
return time.Time{}, errMetadataKeyNotExist
}
return time.Parse(timeFormatAMZ, timeStrs[0])
}
// Set file Modification time.
func (f fileMetadata) SetModTime(modTime time.Time) {
f.Set("file.modTime", modTime.Format(timeFormatAMZ))
}
// Get file version.
func (f fileMetadata) GetFileVersion() (int64, error) {
version := f.Get("file.version")
if version == nil {
return 0, errMetadataKeyNotExist
}
return strconv.ParseInt(version[0], 10, 64)
}
// Set file version.
func (f fileMetadata) SetFileVersion(fileVersion int64) {
f.Set("file.version", strconv.FormatInt(fileVersion, 10))
}
// fileMetadataDecode - file metadata decode.
func fileMetadataDecode(reader io.Reader) (fileMetadata, error) {
metadata := make(fileMetadata)
// xlMetaV1Decode - file metadata decode.
func xlMetaV1Decode(reader io.Reader) (metadata xlMetaV1, err error) {
decoder := json.NewDecoder(reader)
// Unmarshalling failed, file possibly corrupted.
if err := decoder.Decode(&metadata); err != nil {
return nil, err
if err = decoder.Decode(&metadata); err != nil {
return xlMetaV1{}, err
}
return metadata, nil
}

@ -62,15 +62,6 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
}()
}
fileSize, err := metadata.GetSize()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("Failed to get file size, %s", err)
return nil, err
}
// Acquire read lock again.
xl.lockNS(volume, path, readLock)
readers := make([]io.ReadCloser, len(xl.storageDisks))
@ -78,7 +69,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
if disk == nil {
continue
}
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
erasurePart := slashpath.Join(path, fmt.Sprintf("file.%d", index))
// If disk.ReadFile returns error and we don't have read quorum it will be taken care as
// ReedSolomon.Reconstruct() will fail later.
var reader io.ReadCloser
@ -91,18 +82,18 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
go func() {
var totalLeft = fileSize
var totalLeft = metadata.Stat.Size
// Read until the totalLeft.
for totalLeft > 0 {
// Figure out the right blockSize as it was encoded before.
var curBlockSize int
if erasureBlockSize < totalLeft {
curBlockSize = erasureBlockSize
var curBlockSize int64
if metadata.Erasure.BlockSize < totalLeft {
curBlockSize = metadata.Erasure.BlockSize
} else {
curBlockSize = int(totalLeft)
curBlockSize = totalLeft
}
// Calculate the current encoded block size.
curEncBlockSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks)
curEncBlockSize := getEncodedBlockLen(curBlockSize, metadata.Erasure.DataBlocks)
enBlocks := make([][]byte, len(xl.storageDisks))
// Loop through all readers and read.
for index, reader := range readers {
@ -117,8 +108,6 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
}
}
// TODO need to verify block512Sum.
// Check blocks if they are all zero in length.
if checkBlockSize(enBlocks) == 0 {
log.WithFields(logrus.Fields{
@ -181,7 +170,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
}
// Join the decoded blocks.
err = xl.ReedSolomon.Join(pipeWriter, enBlocks, curBlockSize)
err = xl.ReedSolomon.Join(pipeWriter, enBlocks, int(curBlockSize))
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
@ -192,7 +181,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
}
// Save what's left after reading erasureBlockSize.
totalLeft = totalLeft - erasureBlockSize
totalLeft = totalLeft - metadata.Erasure.BlockSize
}
// Cleanly end the pipe after a successful decoding.

@ -30,7 +30,7 @@ func checkBlockSize(blocks [][]byte) int {
// calculate the blockSize based on input length and total number of
// data blocks.
func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) {
curBlockSize = (inputLen + dataBlocks - 1) / dataBlocks
return
func getEncodedBlockLen(inputLen int64, dataBlocks int) (curBlockSize int64) {
curBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks)
return curBlockSize
}

@ -30,7 +30,7 @@ import (
const (
// Part metadata file.
metadataFile = "part.json"
xlMetaV1File = "xl.json"
// Maximum erasure blocks.
maxErasureBlocks = 16
)
@ -325,32 +325,32 @@ func isLeafDirectory(disk StorageAPI, volume, leafPath string) (isLeaf bool) {
return true
}
// extractMetadata - extract file metadata.
func extractMetadata(disk StorageAPI, volume, path string) (fileMetadata, error) {
metadataFilePath := slashpath.Join(path, metadataFile)
// extractMetadata - extract xl metadata.
func extractMetadata(disk StorageAPI, volume, path string) (xlMetaV1, error) {
xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File)
// We are not going to read partial data from metadata file,
// read the whole file always.
offset := int64(0)
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
metadataReader, err := disk.ReadFile(volume, xlMetaV1FilePath, offset)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": metadataFilePath,
"path": xlMetaV1FilePath,
"offset": offset,
}).Errorf("ReadFile failed with %s", err)
return nil, err
return xlMetaV1{}, err
}
// Close metadata reader.
defer metadataReader.Close()
metadata, err := fileMetadataDecode(metadataReader)
metadata, err := xlMetaV1Decode(metadataReader)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": metadataFilePath,
"path": xlMetaV1FilePath,
"offset": offset,
}).Errorf("fileMetadataDecode failed with %s", err)
return nil, err
}).Errorf("xlMetaV1Decode failed with %s", err)
return xlMetaV1{}, err
}
return metadata, nil
}
@ -369,25 +369,9 @@ func extractFileInfo(disk StorageAPI, volume, path string) (FileInfo, error) {
}).Errorf("extractMetadata failed with %s", err)
return FileInfo{}, err
}
fileSize, err := metadata.GetSize()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("GetSize failed with %s", err)
return FileInfo{}, err
}
fileModTime, err := metadata.GetModTime()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("GetModTime failed with %s", err)
return FileInfo{}, err
}
fileInfo.Size = fileSize
fileInfo.Mode = os.FileMode(0644)
fileInfo.ModTime = fileModTime
fileInfo.Size = metadata.Stat.Size
fileInfo.ModTime = metadata.Stat.ModTime
fileInfo.Mode = os.FileMode(0644) // This is a file already.
return fileInfo, nil
}
@ -458,7 +442,7 @@ func listFiles(disk StorageAPI, volume, prefix, marker string, recursive bool, c
if isLeaf {
// For leaf for now we just point to the first block, make it
// dynamic in future based on the availability of storage disks.
markerPath = slashpath.Join(marker, metadataFile)
markerPath = slashpath.Join(marker, xlMetaV1File)
}
}
@ -478,7 +462,7 @@ func listFiles(disk StorageAPI, volume, prefix, marker string, recursive bool, c
}
for _, fsFileInfo := range fsFilesInfo {
// Skip metadata files.
if strings.HasSuffix(fsFileInfo.Name, metadataFile) {
if strings.HasSuffix(fsFileInfo.Name, xlMetaV1File) {
continue
}
var fileInfo FileInfo
@ -518,9 +502,8 @@ func listFiles(disk StorageAPI, volume, prefix, marker string, recursive bool, c
// markerPath for the next disk.ListFiles() iteration.
markerPath = fsFilesInfo[len(fsFilesInfo)-1].Name
}
if count == 0 && recursive && !strings.HasSuffix(markerPath, metadataFile) {
// If last entry is not part.json then loop once more to check if we
// have reached eof.
if count == 0 && recursive && !strings.HasSuffix(markerPath, xlMetaV1File) {
// If last entry is not xl.json then loop once more to check if we have reached eof.
fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, 1)
if err != nil {
log.WithFields(logrus.Fields{
@ -533,17 +516,17 @@ func listFiles(disk StorageAPI, volume, prefix, marker string, recursive bool, c
return nil, true, err
}
if !eof {
// part.N and part.json are always in pairs and hence this
// entry has to be part.json. If not better to manually investigate
// file.N and xl.json are always in pairs and hence this
// entry has to be xl.json. If not better to manually investigate
// and fix it.
// For the next ListFiles() call we can safely assume that the
// marker is "object/part.json"
if !strings.HasSuffix(fsFilesInfo[0].Name, metadataFile) {
// marker is "object/xl.json"
if !strings.HasSuffix(fsFilesInfo[0].Name, xlMetaV1File) {
log.WithFields(logrus.Fields{
"volume": volume,
"prefix": prefix,
"fsFileInfo.Name": fsFilesInfo[0].Name,
}).Errorf("ListFiles failed with %s, expected %s to be a part.json file.", err, fsFilesInfo[0].Name)
}).Errorf("ListFiles failed with %s, expected %s to be a xl.json file.", err, fsFilesInfo[0].Name)
return nil, true, errUnexpected
}
}
@ -594,30 +577,12 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
}()
}
// Extract metadata.
size, err := metadata.GetSize()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("GetSize failed with %s", err)
return FileInfo{}, err
}
modTime, err := metadata.GetModTime()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("GetModTime failed with %s", err)
return FileInfo{}, err
}
// Return file info.
return FileInfo{
Volume: volume,
Name: path,
Size: size,
ModTime: modTime,
Size: metadata.Stat.Size,
ModTime: metadata.Stat.ModTime,
Mode: os.FileMode(0644),
}, nil
}
@ -632,7 +597,7 @@ func (xl XL) DeleteFile(volume, path string) error {
}
// Loop through and delete each chunks.
for index, disk := range xl.storageDisks {
erasureFilePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
erasureFilePart := slashpath.Join(path, fmt.Sprintf("file.%d", index))
err := disk.DeleteFile(volume, erasureFilePart)
if err != nil {
log.WithFields(logrus.Fields{
@ -641,8 +606,8 @@ func (xl XL) DeleteFile(volume, path string) error {
}).Errorf("DeleteFile failed with %s", err)
return err
}
metadataFilePath := slashpath.Join(path, metadataFile)
err = disk.DeleteFile(volume, metadataFilePath)
xlMetaV1FilePath := slashpath.Join(path, xlMetaV1File)
err = disk.DeleteFile(volume, xlMetaV1FilePath)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,

Loading…
Cancel
Save