xl: Simplify reading metadata and add a new fileMetadata type. (#1346)

master
Harshavardhana 9 years ago committed by Harshavardhana
parent f3784d1087
commit 9bd9441107
  1. 49
      xl-v1-createfile.go
  2. 31
      xl-v1-errors.go
  3. 78
      xl-v1-metadata.go
  4. 10
      xl-v1-readfile.go
  5. 13
      xl-v1-utils.go
  6. 150
      xl-v1.go

@ -79,7 +79,7 @@ func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 {
metadataFilePath := slashpath.Join(path, metadataFile) metadataFilePath := slashpath.Join(path, metadataFile)
// Set offset to 0 to read entire file. // Set offset to 0 to read entire file.
offset := int64(0) offset := int64(0)
metadata := make(map[string]string) metadata := make(fileMetadata)
// Allocate disk index format map - do not use maps directly // Allocate disk index format map - do not use maps directly
// without allocating. // without allocating.
@ -94,22 +94,21 @@ func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 {
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
if err != nil { if err != nil {
continue continue
} else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { } else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil {
continue continue
}
} else if _, ok := metadata["file.version"]; !ok { if version := metadata.Get("file.version"); version == nil {
fileQuorumVersionMap[index] = 0 fileQuorumVersionMap[index] = 0
} else {
}
// Convert string to integer. // Convert string to integer.
fileVersion, err := strconv.ParseInt(metadata["file.version"], 10, 64) fileVersion, err := strconv.ParseInt(version[0], 10, 64)
if err != nil { if err != nil {
continue continue
} }
fileQuorumVersionMap[index] = fileVersion fileQuorumVersionMap[index] = fileVersion
} }
}
return fileQuorumVersionMap return fileQuorumVersionMap
} }
@ -238,21 +237,21 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
} }
// Initialize metadata map, save all erasure related metadata. // Initialize metadata map, save all erasure related metadata.
metadata := make(map[string]string) metadata := make(fileMetadata)
metadata["version"] = minioVersion metadata.Set("version", minioVersion)
metadata["format.major"] = "1" metadata.Set("format.major", "1")
metadata["format.minor"] = "0" metadata.Set("format.minor", "0")
metadata["format.patch"] = "0" metadata.Set("format.patch", "0")
metadata["file.size"] = strconv.FormatInt(totalSize, 10) metadata.Set("file.size", strconv.FormatInt(totalSize, 10))
if len(xl.storageDisks) > len(writers) { if len(xl.storageDisks) > len(writers) {
// Save file.version only if we wrote to less disks than all // Save file.version only if we wrote to less disks than all
// storage disks. // storage disks.
metadata["file.version"] = strconv.FormatInt(higherVersion, 10) metadata.Set("file.version", strconv.FormatInt(higherVersion, 10))
} }
metadata["file.modTime"] = modTime.Format(timeFormatAMZ) metadata.Set("file.modTime", modTime.Format(timeFormatAMZ))
metadata["file.xl.blockSize"] = strconv.Itoa(erasureBlockSize) metadata.Set("file.xl.blockSize", strconv.Itoa(erasureBlockSize))
metadata["file.xl.dataBlocks"] = strconv.Itoa(xl.DataBlocks) metadata.Set("file.xl.dataBlocks", strconv.Itoa(xl.DataBlocks))
metadata["file.xl.parityBlocks"] = strconv.Itoa(xl.ParityBlocks) metadata.Set("file.xl.parityBlocks", strconv.Itoa(xl.ParityBlocks))
// Write all the metadata. // Write all the metadata.
// below case is not handled here // below case is not handled here
@ -265,25 +264,17 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
} }
if sha512Writers[index] != nil { if sha512Writers[index] != nil {
// Save sha512 checksum of each encoded blocks. // Save sha512 checksum of each encoded blocks.
metadata["file.xl.block512Sum"] = hex.EncodeToString(sha512Writers[index].Sum(nil)) metadata.Set("file.xl.block512Sum", hex.EncodeToString(sha512Writers[index].Sum(nil)))
} }
// Marshal metadata into json strings. // Write metadata.
metadataBytes, err := json.Marshal(metadata) err := metadata.Write(metadataWriter)
if err != nil { if err != nil {
// Remove temporary files. // Remove temporary files.
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
// Write metadata to disk.
_, err = metadataWriter.Write(metadataBytes)
if err != nil {
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err)
return
}
} }
// Close all writers and metadata writers in routines. // Close all writers and metadata writers in routines.

@ -0,0 +1,31 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "errors"
// errFileSize - returned for missing file size.
var errFileSize = errors.New("Missing 'file.size' in metadata")
// errMaxDisks - returned for reached maximum of disks.
var errMaxDisks = errors.New("Total number of disks specified is higher than supported maximum of '16'")
// errNumDisks - returned for odd numebr of disks.
var errNumDisks = errors.New("Invalid number of disks provided, should be always multiples of '2'")
// errModTime - returned for missing file modtime.
var errModTime = errors.New("Missing 'file.modTime' in metadata")

@ -0,0 +1,78 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/json"
"io"
)
// This code is built on similar ideas of http.Header.
// Ref - https://golang.org/pkg/net/http/#Header
// A fileMetadata represents a metadata header mapping
// keys to sets of values.
type fileMetadata map[string][]string
// Add adds the key, value pair to the header.
// It appends to any existing values associated with key.
func (f fileMetadata) Add(key, value string) {
f[key] = append(f[key], value)
}
// Set sets the header entries associated with key to
// the single element value. It replaces any existing
// values associated with key.
func (f fileMetadata) Set(key, value string) {
f[key] = []string{value}
}
// Get gets the first value associated with the given key.
// If there are no values associated with the key, Get returns "".
// Get is a convenience method. For more complex queries,
// access the map directly.
func (f fileMetadata) Get(key string) []string {
if f == nil {
return nil
}
v, ok := f[key]
if !ok {
return nil
}
return v
}
// Write writes a metadata in wire format.
func (f fileMetadata) Write(writer io.Writer) error {
metadataBytes, err := json.Marshal(f)
if err != nil {
return err
}
_, err = writer.Write(metadataBytes)
return err
}
// fileMetadataDecode - file metadata decode.
func fileMetadataDecode(reader io.Reader) (fileMetadata, error) {
metadata := make(fileMetadata)
decoder := json.NewDecoder(reader)
// Unmarshalling failed, file possibly corrupted.
if err := decoder.Decode(&metadata); err != nil {
return nil, err
}
return metadata, nil
}

@ -55,9 +55,9 @@ func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, int64, error)
fileSize := int64(0) fileSize := int64(0)
for index, metadata := range partsMetadata { for index, metadata := range partsMetadata {
if errs[index] == nil { if errs[index] == nil {
if versionStr, ok := metadata["file.version"]; ok { if version := metadata.Get("file.version"); version != nil {
// Convert string to integer. // Convert string to integer.
version, err := strconv.ParseInt(versionStr, 10, 64) version, err := strconv.ParseInt(version[0], 10, 64)
if err != nil { if err != nil {
// Unexpected, return error. // Unexpected, return error.
return nil, 0, err return nil, 0, err
@ -91,15 +91,15 @@ func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, int64, error)
if disk == nil { if disk == nil {
continue continue
} }
if sizeStr, ok := partsMetadata[index]["file.size"]; ok { if size := partsMetadata[index].Get("file.size"); size != nil {
var err error var err error
fileSize, err = strconv.ParseInt(sizeStr, 10, 64) fileSize, err = strconv.ParseInt(size[0], 10, 64)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
break break
} else { } else {
return nil, 0, errors.New("Missing 'file.size' in meta data.") return nil, 0, errFileSize
} }
} }
return quorumDisks, fileSize, nil return quorumDisks, fileSize, nil

@ -9,12 +9,11 @@ import (
// Get parts.json metadata as a map slice. // Get parts.json metadata as a map slice.
// Returns error slice indicating the failed metadata reads. // Returns error slice indicating the failed metadata reads.
func (xl XL) getPartsMetadata(volume, path string) ([]map[string]string, []error) { func (xl XL) getPartsMetadata(volume, path string) ([]fileMetadata, []error) {
errs := make([]error, len(xl.storageDisks)) errs := make([]error, len(xl.storageDisks))
metadataArray := make([]map[string]string, len(xl.storageDisks)) metadataArray := make([]fileMetadata, len(xl.storageDisks))
metadataFilePath := slashpath.Join(path, metadataFile) metadataFilePath := slashpath.Join(path, metadataFile)
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
metadata := make(map[string]string)
offset := int64(0) offset := int64(0)
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
if err != nil { if err != nil {
@ -23,8 +22,8 @@ func (xl XL) getPartsMetadata(volume, path string) ([]map[string]string, []error
} }
defer metadataReader.Close() defer metadataReader.Close()
decoder := json.NewDecoder(metadataReader) metadata, err := fileMetadataDecode(metadataReader)
if err = decoder.Decode(&metadata); err != nil { if err != nil {
// Unable to parse parts.json, set error. // Unable to parse parts.json, set error.
errs[index] = err errs[index] = err
continue continue
@ -39,12 +38,12 @@ func (xl XL) getPartsMetadata(volume, path string) ([]map[string]string, []error
// //
// Returns collection of errors, indexed in accordance with input // Returns collection of errors, indexed in accordance with input
// updateParts order. // updateParts order.
func (xl XL) setPartsMetadata(volume, path string, metadata map[string]string, updateParts []bool) []error { func (xl XL) setPartsMetadata(volume, path string, metadata fileMetadata, updateParts []bool) []error {
metadataFilePath := filepath.Join(path, metadataFile) metadataFilePath := filepath.Join(path, metadataFile)
errs := make([]error, len(xl.storageDisks)) errs := make([]error, len(xl.storageDisks))
for index := range updateParts { for index := range updateParts {
errs[index] = errors.New("metadata not updated") errs[index] = errors.New("Metadata not updated")
} }
metadataBytes, err := json.Marshal(metadata) metadataBytes, err := json.Marshal(metadata)

@ -17,9 +17,6 @@
package main package main
import ( import (
"encoding/hex"
"encoding/json"
"errors"
"fmt" "fmt"
"os" "os"
slashpath "path" slashpath "path"
@ -102,7 +99,7 @@ func newXL(disks ...string) (StorageAPI, error) {
// Verify disks. // Verify disks.
totalDisks := len(disks) totalDisks := len(disks)
if totalDisks > maxErasureBlocks { if totalDisks > maxErasureBlocks {
return nil, errors.New("Total number of disks specified is higher than supported maximum of '16'") return nil, errMaxDisks
} }
// isEven function to verify if a given number if even. // isEven function to verify if a given number if even.
@ -112,7 +109,7 @@ func newXL(disks ...string) (StorageAPI, error) {
// TODO: verify if this makes sense in future. // TODO: verify if this makes sense in future.
if !isEven(totalDisks) { if !isEven(totalDisks) {
return nil, errors.New("Invalid number of directories provided, should be always multiples of '2'") return nil, errNumDisks
} }
// Calculate data and parity blocks. // Calculate data and parity blocks.
@ -125,9 +122,9 @@ func newXL(disks ...string) (StorageAPI, error) {
} }
// Save the reedsolomon. // Save the reedsolomon.
xl.ReedSolomon = rs
xl.DataBlocks = dataBlocks xl.DataBlocks = dataBlocks
xl.ParityBlocks = parityBlocks xl.ParityBlocks = parityBlocks
xl.ReedSolomon = rs
// Initialize all storage disks. // Initialize all storage disks.
storageDisks := make([]StorageAPI, len(disks)) storageDisks := make([]StorageAPI, len(disks))
@ -150,6 +147,7 @@ func newXL(disks ...string) (StorageAPI, error) {
// Read quorum should be always N/2 + 1 (due to Vandermonde matrix // Read quorum should be always N/2 + 1 (due to Vandermonde matrix
// erasure requirements) // erasure requirements)
xl.readQuorum = len(xl.storageDisks)/2 + 1 xl.readQuorum = len(xl.storageDisks)/2 + 1
// Write quorum is assumed if we have total disks + 3 // Write quorum is assumed if we have total disks + 3
// parity. (Need to discuss this again) // parity. (Need to discuss this again)
xl.writeQuorum = len(xl.storageDisks)/2 + 3 xl.writeQuorum = len(xl.storageDisks)/2 + 3
@ -302,16 +300,21 @@ func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) {
return isLeaf return isLeaf
} }
// fileMetadata - file metadata is a structured representation of the // Returns file size from the metadata.
// unmarshalled metadata file. func getFileSize(metadata fileMetadata) (int64, error) {
type fileMetadata struct { size := metadata.Get("file.size")
Size int64 if size == nil {
ModTime time.Time return 0, errFileSize
BlockSize int64 }
Block512Sum string return strconv.ParseInt(size[0], 10, 64)
DataBlocks int }
ParityBlocks int
fileVersion int64 func getModTime(metadata fileMetadata) (time.Time, error) {
modTime := metadata.Get("file.modTime")
if modTime == nil {
return time.Time{}, errModTime
}
return time.Parse(timeFormatAMZ, modTime[0])
} }
// extractMetadata - extract file metadata. // extractMetadata - extract file metadata.
@ -323,83 +326,40 @@ func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) {
disk := xl.storageDisks[0] disk := xl.storageDisks[0]
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
if err != nil { if err != nil {
return fileMetadata{}, err return nil, err
} }
// Close metadata reader. // Close metadata reader.
defer metadataReader.Close() defer metadataReader.Close()
var metadata = make(map[string]string) metadata, err := fileMetadataDecode(metadataReader)
decoder := json.NewDecoder(metadataReader)
// Unmarshalling failed, file possibly corrupted.
if err = decoder.Decode(&metadata); err != nil {
return fileMetadata{}, err
}
modTime, err := time.Parse(timeFormatAMZ, metadata["file.modTime"])
if err != nil { if err != nil {
return fileMetadata{}, err return nil, err
}
// Verify if size is parsable.
var size int64
size, err = strconv.ParseInt(metadata["file.size"], 10, 64)
if err != nil {
return fileMetadata{}, err
} }
return metadata, nil
}
// Verify if file.version is parsable. // Extract file info from paths.
var fileVersion int64 func (xl XL) extractFileInfo(volume, path string) (FileInfo, error) {
// missing file.version is valid fileInfo := FileInfo{}
if _, ok := metadata["file.version"]; ok { fileInfo.Volume = volume
fileVersion, err = strconv.ParseInt(metadata["file.version"], 10, 64) fileInfo.Name = path
if err != nil {
return fileMetadata{}, err
}
}
// Verify if block size is parsable. metadata, err := xl.extractMetadata(volume, path)
var blockSize int64
blockSize, err = strconv.ParseInt(metadata["file.xl.blockSize"], 10, 64)
if err != nil { if err != nil {
return fileMetadata{}, err return FileInfo{}, err
} }
fileSize, err := getFileSize(metadata)
// Verify if data blocks and parity blocks are parsable.
var dataBlocks, parityBlocks int
dataBlocks, err = strconv.Atoi(metadata["file.xl.dataBlocks"])
if err != nil { if err != nil {
return fileMetadata{}, err return FileInfo{}, err
} }
parityBlocks, err = strconv.Atoi(metadata["file.xl.parityBlocks"]) fileModTime, err := getModTime(metadata)
if err != nil { if err != nil {
return fileMetadata{}, err return FileInfo{}, err
} }
fileInfo.Size = fileSize
// Verify if sha512sum is of proper hex format. fileInfo.Mode = os.FileMode(0644)
sha512Sum := metadata["file.xl.block512Sum"] fileInfo.ModTime = fileModTime
_, err = hex.DecodeString(sha512Sum) return fileInfo, nil
if err != nil {
return fileMetadata{}, err
}
// Return the concocted metadata.
return fileMetadata{
Size: size,
ModTime: modTime,
BlockSize: blockSize,
Block512Sum: sha512Sum,
DataBlocks: dataBlocks,
ParityBlocks: parityBlocks,
fileVersion: fileVersion,
}, nil
}
const (
slashSeparator = "/"
)
// retainSlash - retains slash from a path.
func retainSlash(path string) string {
return strings.TrimSuffix(path, slashSeparator) + slashSeparator
} }
// byFileInfoName is a collection satisfying sort.Interface. // byFileInfoName is a collection satisfying sort.Interface.
@ -427,21 +387,6 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int)
} }
} }
// Extract file info from paths.
extractFileInfo := func(volume, path string) (FileInfo, error) {
var fileInfo = FileInfo{}
var metadata fileMetadata
fileInfo.Name = slashpath.Dir(path)
metadata, err = xl.extractMetadata(volume, fileInfo.Name)
if err != nil {
return FileInfo{}, err
}
fileInfo.Size = metadata.Size
fileInfo.ModTime = metadata.ModTime
fileInfo.Mode = os.FileMode(0644)
return fileInfo, nil
}
// List files. // List files.
fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, count) fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, count)
if err != nil { if err != nil {
@ -459,7 +404,10 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int)
isLeaf = xl.isLeafDirectory(volume, fsFileInfo.Name) isLeaf = xl.isLeafDirectory(volume, fsFileInfo.Name)
} }
if isLeaf || !fsFileInfo.Mode.IsDir() { if isLeaf || !fsFileInfo.Mode.IsDir() {
fileInfo, err = extractFileInfo(volume, fsFileInfo.Name) // Extract the parent of leaf directory or file to get the
// actual name.
path := slashpath.Dir(fsFileInfo.Name)
fileInfo, err = xl.extractFileInfo(volume, path)
if err != nil { if err != nil {
// For a leaf directory, if err is FileNotFound then // For a leaf directory, if err is FileNotFound then
// perhaps has a missing metadata. Ignore it and let // perhaps has a missing metadata. Ignore it and let
@ -491,19 +439,13 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
} }
// Extract metadata. // Extract metadata.
metadata, err := xl.extractMetadata(volume, path) fileInfo, err := xl.extractFileInfo(volume, path)
if err != nil { if err != nil {
return FileInfo{}, err return FileInfo{}, err
} }
// Return file info. // Return fileinfo.
return FileInfo{ return fileInfo, nil
Volume: volume,
Name: path,
Size: metadata.Size,
ModTime: metadata.ModTime,
Mode: os.FileMode(0644),
}, nil
} }
// DeleteFile - delete a file // DeleteFile - delete a file

Loading…
Cancel
Save