list/xl: Fix the way marker is handled in leafDirectory verification.

master
Harshavardhana 9 years ago committed by Harshavardhana
parent c302875774
commit c7bf471c9e
  1. 2
      api-router.go
  2. 4
      object-api.go
  3. 19
      routers.go
  4. 10
      storage-rpc-server.go
  5. 2
      web-router.go
  6. 256
      xl-v1-healfile.go
  7. 10
      xl-v1-readfile.go
  8. 238
      xl-v1.go

@ -20,7 +20,7 @@ import router "github.com/gorilla/mux"
// objectAPIHandler implements and provides http handlers for S3 API.
type objectAPIHandlers struct {
ObjectAPI *objectAPI
ObjectAPI objectAPI
}
// registerAPIRouter - registers S3 compatible APIs.

@ -33,8 +33,8 @@ type objectAPI struct {
storage StorageAPI
}
func newObjectLayer(storage StorageAPI) *objectAPI {
return &objectAPI{storage}
func newObjectLayer(storage StorageAPI) objectAPI {
return objectAPI{storage}
}
/// Bucket operations

@ -26,7 +26,7 @@ import (
// configureServer handler returns final handler for the http server.
func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
var storageHandlers StorageAPI
var storageAPI StorageAPI
var e error
if len(srvCmdConfig.exportPaths) == 1 {
// Verify if export path is a local file system path.
@ -34,37 +34,40 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
st, e = os.Stat(srvCmdConfig.exportPaths[0])
if e == nil && st.Mode().IsDir() {
// Initialize storage API.
storageHandlers, e = newFS(srvCmdConfig.exportPaths[0])
storageAPI, e = newFS(srvCmdConfig.exportPaths[0])
fatalIf(probe.NewError(e), "Initializing fs failed.", nil)
} else {
// Initialize network storage API.
storageHandlers, e = newNetworkFS(srvCmdConfig.exportPaths[0])
storageAPI, e = newNetworkFS(srvCmdConfig.exportPaths[0])
fatalIf(probe.NewError(e), "Initializing network fs failed.", nil)
}
} else {
// Initialize XL storage API.
storageHandlers, e = newXL(srvCmdConfig.exportPaths...)
storageAPI, e = newXL(srvCmdConfig.exportPaths...)
fatalIf(probe.NewError(e), "Initializing XL failed.", nil)
}
// Initialize object layer.
objectAPI := newObjectLayer(storageHandlers)
objAPI := newObjectLayer(storageAPI)
// Initialize storage rpc.
storageRPC := newStorageRPC(storageAPI)
// Initialize API.
apiHandlers := objectAPIHandlers{
ObjectAPI: objectAPI,
ObjectAPI: objAPI,
}
// Initialize Web.
webHandlers := &webAPIHandlers{
ObjectAPI: objectAPI,
ObjectAPI: objAPI,
}
// Initialize router.
mux := router.NewRouter()
// Register all routers.
registerStorageRPCRouter(mux, storageHandlers)
registerStorageRPCRouter(mux, storageRPC)
registerWebRouter(mux, webHandlers)
registerAPIRouter(mux, apiHandlers)
// Add new routers here.

@ -99,11 +99,15 @@ func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericRep
return nil
}
// registerStorageRPCRouter - register storage rpc router.
func registerStorageRPCRouter(mux *router.Router, storageAPI StorageAPI) {
stServer := &storageServer{
// Initialize new storage rpc.
func newStorageRPC(storageAPI StorageAPI) *storageServer {
return &storageServer{
storage: storageAPI,
}
}
// registerStorageRPCRouter - register storage rpc router.
func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) {
storageRPCServer := rpc.NewServer()
storageRPCServer.RegisterName("Storage", stServer)
storageRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter()

@ -30,7 +30,7 @@ import (
// webAPI container for Web API.
type webAPIHandlers struct {
ObjectAPI *objectAPI
ObjectAPI objectAPI
}
// indexHandler - Handler to serve index.html

@ -0,0 +1,256 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
slashpath "path"
"strconv"
)
func (xl XL) selfHeal(volume string, path string) error {
totalShards := xl.DataBlocks + xl.ParityBlocks
needsSelfHeal := make([]bool, totalShards)
var metadata = make(map[string]string)
var readers = make([]io.Reader, totalShards)
var writers = make([]io.WriteCloser, totalShards)
for index, disk := range xl.storageDisks {
metadataFile := slashpath.Join(path, metadataFile)
// Start from the beginning, we are not reading partial metadata files.
offset := int64(0)
metadataReader, err := disk.ReadFile(volume, metadataFile, offset)
if err != nil {
if err != errFileNotFound {
continue
}
// Needs healing if part.json is not found
needsSelfHeal[index] = true
continue
}
defer metadataReader.Close()
decoder := json.NewDecoder(metadataReader)
if err = decoder.Decode(&metadata); err != nil {
// needs healing if parts.json is not parsable
needsSelfHeal[index] = true
}
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
erasuredPartReader, err := disk.ReadFile(volume, erasurePart, offset)
if err != nil {
if err == errFileNotFound {
// Needs healing if part file not found
needsSelfHeal[index] = true
}
return err
}
readers[index] = erasuredPartReader
defer erasuredPartReader.Close()
}
// Check if there is atleast one part that needs to be healed.
atleastOneSelfHeal := false
for _, shNeeded := range needsSelfHeal {
if shNeeded {
atleastOneSelfHeal = true
break
}
}
if !atleastOneSelfHeal {
// Return if healing not needed anywhere.
return nil
}
// create writers for parts where healing is needed.
for index, shNeeded := range needsSelfHeal {
if !shNeeded {
continue
}
var err error
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
writers[index], err = xl.storageDisks[index].CreateFile(volume, erasurePart)
if err != nil {
// Unexpected error
closeAndRemoveWriters(writers...)
return err
}
}
size, err := strconv.ParseInt(metadata["file.size"], 10, 64)
if err != nil {
closeAndRemoveWriters(writers...)
return err
}
var totalLeft = size
for totalLeft > 0 {
// Figure out the right blockSize.
var curBlockSize int
if erasureBlockSize < totalLeft {
curBlockSize = erasureBlockSize
} else {
curBlockSize = int(totalLeft)
}
// Calculate the current shard size.
curShardSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks)
enShards := make([][]byte, totalShards)
// Loop through all readers and read.
for index, reader := range readers {
// Initialize shard slice and fill the data from each parts.
// ReedSolomon.Verify() expects that slice is not nil even if the particular
// part needs healing.
enShards[index] = make([]byte, curShardSize)
if needsSelfHeal[index] {
// Skip reading if the part needs healing.
continue
}
_, e := io.ReadFull(reader, enShards[index])
if e != nil && e != io.ErrUnexpectedEOF {
enShards[index] = nil
}
}
// Check blocks if they are all zero in length.
if checkBlockSize(enShards) == 0 {
err = errors.New("Data likely corrupted, all blocks are zero in length.")
return err
}
// Verify the shards.
ok, e := xl.ReedSolomon.Verify(enShards)
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
// Verification failed, shards require reconstruction.
if !ok {
for index, shNeeded := range needsSelfHeal {
if shNeeded {
// Reconstructs() reconstructs the parts if the array is nil.
enShards[index] = nil
}
}
e = xl.ReedSolomon.Reconstruct(enShards)
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
// Verify reconstructed shards again.
ok, e = xl.ReedSolomon.Verify(enShards)
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
if !ok {
// Shards cannot be reconstructed, corrupted data.
e = errors.New("Verification failed after reconstruction, data likely corrupted.")
closeAndRemoveWriters(writers...)
return e
}
}
for index, shNeeded := range needsSelfHeal {
if !shNeeded {
continue
}
_, e := writers[index].Write(enShards[index])
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
}
totalLeft = totalLeft - erasureBlockSize
}
// After successful healing Close() the writer so that the temp
// files are committed to their location.
for index, shNeeded := range needsSelfHeal {
if !shNeeded {
continue
}
writers[index].Close()
}
// Write part.json where ever healing was done.
var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks))
for index, shNeeded := range needsSelfHeal {
if !shNeeded {
continue
}
metadataFile := slashpath.Join(path, metadataFile)
metadataWriters[index], err = xl.storageDisks[index].CreateFile(volume, metadataFile)
if err != nil {
closeAndRemoveWriters(writers...)
return err
}
}
metadataBytes, err := json.Marshal(metadata)
if err != nil {
closeAndRemoveWriters(metadataWriters...)
return err
}
for index, shNeeded := range needsSelfHeal {
if !shNeeded {
continue
}
_, err = metadataWriters[index].Write(metadataBytes)
if err != nil {
closeAndRemoveWriters(metadataWriters...)
return err
}
}
// Metadata written for all the healed parts hence Close() so that
// temp files can be committed.
for index := range xl.storageDisks {
if !needsSelfHeal[index] {
continue
}
metadataWriters[index].Close()
}
return nil
}
// self heal.
type selfHeal struct {
volume string
path string
errCh chan<- error
}
// selfHealRoutine - starts a go routine and listens on a channel for healing requests.
func (xl *XL) selfHealRoutine() {
xl.selfHealCh = make(chan selfHeal)
// Healing request can be made like this:
// errCh := make(chan error)
// xl.selfHealCh <- selfHeal{"testbucket", "testobject", errCh}
// fmt.Println(<-errCh)
go func() {
for sh := range xl.selfHealCh {
if sh.volume == "" || sh.path == "" {
sh.errCh <- errors.New("volume or path can not be empty")
continue
}
xl.selfHeal(sh.volume, sh.path)
sh.errCh <- nil
}
}()
}

@ -90,7 +90,7 @@ func (xl XL) getReadFileQuorumDisks(volume, path string) (quorumDisks []quorumDi
for disk, version := range diskVersionMap {
if version > higherVersion {
higherVersion = version
quorumDisks = []quorumDisk{quorumDisk{disk, i}}
quorumDisks = []quorumDisk{{disk, i}}
} else if version == higherVersion {
quorumDisks = append(quorumDisks, quorumDisk{disk, i})
}
@ -133,10 +133,10 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
return nil, errInvalidArgument
}
// Acquire a read lock.
readLock := true
xl.lockNS(volume, path, readLock)
defer xl.unlockNS(volume, path, readLock)
// Acquire a read lock. - TODO - disable this due to stack overflow bug.
// readLock := true
// xl.lockNS(volume, path, readLock)
// defer xl.unlockNS(volume, path, readLock)
// Check read quorum.
quorumDisks := xl.getReadFileQuorumDisks(volume, path)

@ -21,7 +21,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
slashpath "path"
"sort"
@ -40,13 +39,6 @@ const (
maxErasureBlocks = 16
)
// Self Heal data
type selfHeal struct {
volume string
fsPath string
errCh chan error
}
// XL layer structure.
type XL struct {
ReedSolomon reedsolomon.Encoder // Erasure encoder/decoder.
@ -57,6 +49,8 @@ type XL struct {
nameSpaceLockMapMutex *sync.Mutex
readQuorum int
writeQuorum int
// Heal input/output channel.
selfHealCh chan selfHeal
}
@ -163,7 +157,7 @@ func newXL(disks ...string) (StorageAPI, error) {
xl.writeQuorum = len(xl.storageDisks)
}
// Start self heal go routine.
// Start self heal go routine, taking inputs over self heal channel.
xl.selfHealRoutine()
// Return successfully initialized.
@ -227,8 +221,9 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
// format it means that the parent directory is the actual object name.
func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) {
var allFileInfos []FileInfo
var markerPath string
for {
fileInfos, eof, e := xl.storageDisks[0].ListFiles(volume, leafPath, "", false, 1000)
fileInfos, eof, e := xl.storageDisks[0].ListFiles(volume, leafPath, markerPath, false, 1000)
if e != nil {
break
}
@ -236,6 +231,8 @@ func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) {
if eof {
break
}
// MarkerPath to get the next set of files.
markerPath = allFileInfos[len(allFileInfos)-1].Name
}
for _, fileInfo := range allFileInfos {
if fileInfo.Mode.IsDir() {
@ -477,224 +474,3 @@ func (xl XL) DeleteFile(volume, path string) error {
}
return nil
}
// selfHeal - called by the healing go-routine, heals using erasure coding.
func (xl XL) selfHeal(volume string, fsPath string) error {
totalShards := xl.DataBlocks + xl.ParityBlocks
needsSelfHeal := make([]bool, totalShards)
var metadata = make(map[string]string)
var readers = make([]io.Reader, totalShards)
var writers = make([]io.WriteCloser, totalShards)
for index, disk := range xl.storageDisks {
metadataFile := slashpath.Join(fsPath, metadataFile)
// Start from the beginning, we are not reading partial metadata files.
offset := int64(0)
metadataReader, err := disk.ReadFile(volume, metadataFile, offset)
if err != nil {
if err != errFileNotFound {
continue
}
// Needs healing if part.json is not found
needsSelfHeal[index] = true
continue
}
defer metadataReader.Close()
decoder := json.NewDecoder(metadataReader)
if err = decoder.Decode(&metadata); err != nil {
// needs healing if parts.json is not parsable
needsSelfHeal[index] = true
}
erasurePart := slashpath.Join(fsPath, fmt.Sprintf("part.%d", index))
erasuredPartReader, err := disk.ReadFile(volume, erasurePart, offset)
if err != nil {
if err != errFileNotFound {
continue
}
// needs healing if part file not found
needsSelfHeal[index] = true
} else {
readers[index] = erasuredPartReader
defer erasuredPartReader.Close()
}
}
// Check if there is atleat one part that needs to be healed.
atleastOneSelfHeal := false
for _, shNeeded := range needsSelfHeal {
if shNeeded {
atleastOneSelfHeal = true
break
}
}
if !atleastOneSelfHeal {
// return if healing not needed anywhere.
return nil
}
// create writers for parts where healing is needed.
for i, shNeeded := range needsSelfHeal {
if !shNeeded {
continue
}
var err error
erasurePart := slashpath.Join(fsPath, fmt.Sprintf("part.%d", i))
writers[i], err = xl.storageDisks[i].CreateFile(volume, erasurePart)
if err != nil {
// Unexpected error
closeAndRemoveWriters(writers...)
return err
}
}
size, err := strconv.ParseInt(metadata["file.size"], 10, 64)
if err != nil {
closeAndRemoveWriters(writers...)
return err
}
var totalLeft = size
for totalLeft > 0 {
// Figure out the right blockSize.
var curBlockSize int
if erasureBlockSize < totalLeft {
curBlockSize = erasureBlockSize
} else {
curBlockSize = int(totalLeft)
}
// Calculate the current shard size.
curShardSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks)
enShards := make([][]byte, totalShards)
// Loop through all readers and read.
for i, reader := range readers {
// Initialize shard slice and fill the data from each parts.
// ReedSolomon.Verify() expects that slice is not nil even if the particular
// part needs healing.
enShards[i] = make([]byte, curShardSize)
if needsSelfHeal[i] {
// Skip reading if the part needs healing.
continue
}
_, e := io.ReadFull(reader, enShards[i])
if e != nil && e != io.ErrUnexpectedEOF {
enShards[i] = nil
}
}
// Check blocks if they are all zero in length.
if checkBlockSize(enShards) == 0 {
err = errors.New("Data likely corrupted, all blocks are zero in length.")
return err
}
// Verify the shards.
ok, e := xl.ReedSolomon.Verify(enShards)
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
// Verification failed, shards require reconstruction.
if !ok {
for i, shNeeded := range needsSelfHeal {
if shNeeded {
// Reconstructs() reconstructs the parts if the array is nil.
enShards[i] = nil
}
}
e = xl.ReedSolomon.Reconstruct(enShards)
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
// Verify reconstructed shards again.
ok, e = xl.ReedSolomon.Verify(enShards)
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
if !ok {
// Shards cannot be reconstructed, corrupted data.
e = errors.New("Verification failed after reconstruction, data likely corrupted.")
closeAndRemoveWriters(writers...)
return e
}
}
for i, shNeeded := range needsSelfHeal {
if !shNeeded {
continue
}
_, e := writers[i].Write(enShards[i])
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
}
totalLeft = totalLeft - erasureBlockSize
}
// After successful healing Close() the writer so that the temp files are renamed.
for i, shNeeded := range needsSelfHeal {
if !shNeeded {
continue
}
writers[i].Close()
}
// Write part.json where ever healing was done.
var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks))
for i, shNeeded := range needsSelfHeal {
if !shNeeded {
continue
}
metadataFile := slashpath.Join(fsPath, metadataFile)
metadataWriters[i], err = xl.storageDisks[i].CreateFile(volume, metadataFile)
if err != nil {
closeAndRemoveWriters(writers...)
return err
}
}
metadataBytes, err := json.Marshal(metadata)
if err != nil {
closeAndRemoveWriters(metadataWriters...)
return err
}
for i, shNeeded := range needsSelfHeal {
if !shNeeded {
continue
}
_, err := metadataWriters[i].Write(metadataBytes)
if err != nil {
closeAndRemoveWriters(metadataWriters...)
return err
}
}
// part.json written for all the healed parts hence Close() so that temp files can be renamed.
for index := range xl.storageDisks {
if !needsSelfHeal[index] {
continue
}
metadataWriters[index].Close()
}
return nil
}
// selfHealRoutine - starts a go routine and listens on a channel for healing requests.
func (xl *XL) selfHealRoutine() {
xl.selfHealCh = make(chan selfHeal)
// Healing request can be made like this:
// errCh := make(chan error)
// xl.selfHealCh <- selfHeal{"testbucket", "testobject", errCh}
// fmt.Println(<-errCh)
go func() {
for sh := range xl.selfHealCh {
if sh.volume == "" || sh.fsPath == "" {
sh.errCh <- errors.New("volume or path can not be empty")
continue
}
xl.selfHeal(sh.volume, sh.fsPath)
sh.errCh <- nil
}
}()
}

Loading…
Cancel
Save