@ -18,7 +18,6 @@ package main
import (
"encoding/hex"
"encoding/json"
"fmt"
"hash"
"io"
@ -56,91 +55,39 @@ func closeAndRemoveWriters(writers ...io.WriteCloser) {
}
}
type quorumDisk struct {
disk StorageAPI
index int
}
// getQuorumDisks - get the current quorum disks.
func ( xl XL ) getQuorumDisks ( volume , path string ) ( quorumDisks [ ] quorumDisk , higherVersion int64 ) {
fileQuorumVersionMap := xl . getFileQuorumVersionMap ( volume , path )
for diskIndex , formatVersion := range fileQuorumVersionMap {
if formatVersion > higherVersion {
higherVersion = formatVersion
quorumDisks = [ ] quorumDisk { {
disk : xl . storageDisks [ diskIndex ] ,
index : diskIndex ,
} }
} else if formatVersion == higherVersion {
quorumDisks = append ( quorumDisks , quorumDisk {
disk : xl . storageDisks [ diskIndex ] ,
index : diskIndex ,
} )
}
}
return quorumDisks , higherVersion
}
func ( xl XL ) getFileQuorumVersionMap ( volume , path string ) map [ int ] int64 {
metadataFilePath := slashpath . Join ( path , metadataFile )
// Set offset to 0 to read entire file.
offset := int64 ( 0 )
metadata := make ( fileMetadata )
// Allocate disk index format map - do not use maps directly
// without allocating.
fileQuorumVersionMap := make ( map [ int ] int64 )
// Read meta data from all disks
for index , disk := range xl . storageDisks {
fileQuorumVersionMap [ index ] = - 1
metadataReader , err := disk . ReadFile ( volume , metadataFilePath , offset )
if err != nil {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
} ) . Debugf ( "ReadFile failed with %s" , err )
continue
} else if err = json . NewDecoder ( metadataReader ) . Decode ( & metadata ) ; err != nil {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
} ) . Debugf ( "JSON decoding failed with %s" , err )
continue
}
version , err := metadata . GetFileVersion ( )
if err == errMetadataKeyNotExist {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
} ) . Debugf ( "Missing 'file.version', %s" , errMetadataKeyNotExist )
fileQuorumVersionMap [ index ] = 0
continue
}
if err != nil {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
} ) . Debugf ( "'file.version' decoding failed with %s" , err )
continue
}
fileQuorumVersionMap [ index ] = version
}
return fileQuorumVersionMap
}
// WriteErasure reads predefined blocks, encodes them and writes to
// configured storage disks.
func ( xl XL ) writeErasure ( volume , path string , reader * io . PipeReader , wcloser * waitCloser ) {
// Release the block writer upon function return.
defer wcloser . release ( )
// Get available quorum for existing file path.
_ , higherVersion := xl . getQuorumDisks ( volume , path )
// Lock right before reading from disk.
readLock := true
xl . lockNS ( volume , path , readLock )
partsMetadata , errs := xl . getPartsMetadata ( volume , path )
xl . unlockNS ( volume , path , readLock )
// Count errors other than fileNotFound, bigger than the allowed
// readQuorum, if yes throw an error.
metadataReadErrCount := 0
for _ , err := range errs {
if err != nil && err != errFileNotFound {
metadataReadErrCount ++
if metadataReadErrCount > xl . readQuorum {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
} ) . Errorf ( "%s" , err )
reader . CloseWithError ( err )
return
}
}
}
// List all the file versions on existing files.
versions , err := listFileVersions ( partsMetadata , errs )
// Get highest file version.
higherVersion := highestInt ( versions )
// Increment to have next higher version.
higherVersion ++
@ -156,12 +103,13 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
createFileError := 0
for index , disk := range xl . storageDisks {
erasurePart := slashpath . Join ( path , fmt . Sprintf ( "part.%d" , index ) )
writer , err := disk . CreateFile ( volume , erasurePart )
var writer io . WriteCloser
writer , err = disk . CreateFile ( volume , erasurePart )
if err != nil {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
} ) . Debug f( "CreateFile failed with %s" , err )
} ) . Error f( "CreateFile failed with %s" , err )
createFileError ++
// We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
@ -183,7 +131,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
} ) . Debug f( "CreateFile failed with %s" , err )
} ) . Error f( "CreateFile failed with %s" , err )
createFileError ++
// We can safely allow CreateFile errors up to
@ -208,10 +156,15 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
var totalSize int64 // Saves total incoming stream size.
for {
// Read up to allocated block size.
n , err := io . ReadFull ( reader , dataBuffer )
var n int
n , err = io . ReadFull ( reader , dataBuffer )
if err != nil {
// Any unexpected errors, close the pipe reader with error.
if err != io . ErrUnexpectedEOF && err != io . EOF {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
} ) . Errorf ( "io.ReadFull failed with %s" , err )
// Remove all temp writers.
xl . cleanupCreateFileOps ( volume , path , append ( writers , metadataWriters ... ) ... )
reader . CloseWithError ( err )
@ -227,6 +180,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
var dataBlocks [ ] [ ] byte
dataBlocks , err = xl . ReedSolomon . Split ( dataBuffer [ 0 : n ] )
if err != nil {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
} ) . Errorf ( "Splitting data buffer into erasure data blocks failed with %s" , err )
// Remove all temp writers.
xl . cleanupCreateFileOps ( volume , path , append ( writers , metadataWriters ... ) ... )
reader . CloseWithError ( err )
@ -236,6 +193,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
// Encode parity blocks using data blocks.
err = xl . ReedSolomon . Encode ( dataBlocks )
if err != nil {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
} ) . Errorf ( "Encoding erasure data blocks failed with %s" , err )
// Remove all temp writers upon error.
xl . cleanupCreateFileOps ( volume , path , append ( writers , metadataWriters ... ) ... )
reader . CloseWithError ( err )
@ -250,6 +211,11 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
encodedData := dataBlocks [ index ]
_ , err = writers [ index ] . Write ( encodedData )
if err != nil {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
"diskIndex" : index ,
} ) . Errorf ( "Writing encoded blocks failed with %s" , err )
// Remove all temp writers upon error.
xl . cleanupCreateFileOps ( volume , path , append ( writers , metadataWriters ... ) ... )
reader . CloseWithError ( err )
@ -297,8 +263,13 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
}
// Write metadata.
err : = metadata . Write ( metadataWriter )
err = metadata . Write ( metadataWriter )
if err != nil {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
"diskIndex" : index ,
} ) . Errorf ( "Writing metadata failed with %s" , err )
// Remove temporary files.
xl . cleanupCreateFileOps ( volume , path , append ( writers , metadataWriters ... ) ... )
reader . CloseWithError ( err )
@ -307,8 +278,9 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
}
// Lock right before commit to disk.
xl . lockNS ( volume , path , false )
defer xl . unlockNS ( volume , path , false )
readLock = false // false means writeLock.
xl . lockNS ( volume , path , readLock )
defer xl . unlockNS ( volume , path , readLock )
// Close all writers and metadata writers in routines.
for index , writer := range writers {
@ -316,13 +288,34 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
continue
}
// Safely wrote, now rename to its actual location.
writer . Close ( )
if err = writer . Close ( ) ; err != nil {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
"diskIndex" : index ,
} ) . Errorf ( "Safely committing part failed with %s" , err )
// Remove all temp writers upon error.
xl . cleanupCreateFileOps ( volume , path , append ( writers , metadataWriters ... ) ... )
reader . CloseWithError ( err )
return
}
if metadataWriters [ index ] == nil {
continue
}
// Safely wrote, now rename to its actual location.
metadataWriters [ index ] . Close ( )
if err = metadataWriters [ index ] . Close ( ) ; err != nil {
log . WithFields ( logrus . Fields {
"volume" : volume ,
"path" : path ,
"diskIndex" : index ,
} ) . Errorf ( "Safely committing metadata failed with %s" , err )
// Remove all temp writers upon error.
xl . cleanupCreateFileOps ( volume , path , append ( writers , metadataWriters ... ) ... )
reader . CloseWithError ( err )
return
}
}
// Close the pipe reader and return.