diff --git a/storage-errors.go b/storage-errors.go index d03d42943..ce1141b62 100644 --- a/storage-errors.go +++ b/storage-errors.go @@ -42,3 +42,6 @@ var errVolumeAccessDenied = errors.New("volume access denied") // errVolumeAccessDenied - cannot access file, insufficient permissions. var errFileAccessDenied = errors.New("file access denied") + +// errReadQuorum - did not meet read quorum. +var errReadQuorum = errors.New("I/O error. do not meet read quorum") diff --git a/xl-v1-createfile.go b/xl-v1-createfile.go index 7bdaa197e..6e73e1f74 100644 --- a/xl-v1-createfile.go +++ b/xl-v1-createfile.go @@ -32,6 +32,15 @@ import ( // Erasure block size. const erasureBlockSize = 4 * 1024 * 1024 // 4MiB. +// cleanupCreateFileOps - cleans up all the temporary files and other +// temporary data upon any failure. +func (xl XL) cleanupCreateFileOps(volume, path string, writers ...io.WriteCloser) { + closeAndRemoveWriters(writers...) + for _, disk := range xl.storageDisks { + disk.DeleteFile(volume, path) + } +} + // Close and remove writers if they are safeFile. func closeAndRemoveWriters(writers ...io.WriteCloser) { for _, writer := range writers { @@ -54,9 +63,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { writers[index], err = disk.CreateFile(volume, erasurePart) if err != nil { // Remove previous temp writers for any failure. - closeAndRemoveWriters(writers...) - closeAndRemoveWriters(metadataWriters...) - deletePathAll(volume, path, xl.storageDisks...) + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } @@ -64,9 +71,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { metadataWriters[index], err = disk.CreateFile(volume, metadataFilePath) if err != nil { // Remove previous temp writers for any failure. - closeAndRemoveWriters(writers...) - closeAndRemoveWriters(metadataWriters...) - deletePathAll(volume, path, xl.storageDisks...) + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } @@ -83,9 +88,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { // Any unexpected errors, close the pipe reader with error. if err != io.ErrUnexpectedEOF && err != io.EOF { // Remove all temp writers. - closeAndRemoveWriters(writers...) - closeAndRemoveWriters(metadataWriters...) - deletePathAll(volume, path, xl.storageDisks...) + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } @@ -100,9 +103,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { blocks, err = xl.ReedSolomon.Split(buffer[0:n]) if err != nil { // Remove all temp writers. - closeAndRemoveWriters(writers...) - closeAndRemoveWriters(metadataWriters...) - deletePathAll(volume, path, xl.storageDisks...) + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } @@ -110,9 +111,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { err = xl.ReedSolomon.Encode(blocks) if err != nil { // Remove all temp writers upon error. - closeAndRemoveWriters(writers...) - closeAndRemoveWriters(metadataWriters...) - deletePathAll(volume, path, xl.storageDisks...) + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } @@ -121,9 +120,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { _, err = writers[index].Write(encodedData) if err != nil { // Remove all temp writers upon error. - closeAndRemoveWriters(writers...) - closeAndRemoveWriters(metadataWriters...) - deletePathAll(volume, path, xl.storageDisks...) + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } @@ -157,18 +154,16 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { // Marshal metadata into json strings. metadataBytes, err := json.Marshal(metadata) if err != nil { - closeAndRemoveWriters(writers...) - closeAndRemoveWriters(metadataWriters...) - deletePathAll(volume, path, xl.storageDisks...) + // Remove temporary files. + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } + // Write metadata to disk. _, err = metadataWriter.Write(metadataBytes) if err != nil { - closeAndRemoveWriters(writers...) - closeAndRemoveWriters(metadataWriters...) - deletePathAll(volume, path, xl.storageDisks...) + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } diff --git a/xl-v1-namespace.go b/xl-v1-namespace.go index f1adfe3a9..697c63d18 100644 --- a/xl-v1-namespace.go +++ b/xl-v1-namespace.go @@ -18,44 +18,56 @@ package main import "sync" +// nameSpaceParam - carries name space resource. type nameSpaceParam struct { volume string path string } +// nameSpaceLock - provides primitives for locking critical namespace regions. type nameSpaceLock struct { rwMutex *sync.RWMutex - count uint + rcount uint + wcount uint } -func (nsLock nameSpaceLock) InUse() bool { - return nsLock.count != 0 +func (nsLock *nameSpaceLock) InUse() bool { + return nsLock.rcount != 0 || nsLock.wcount != 0 } -func (nsLock nameSpaceLock) Lock() { +// Lock acquires write lock and increments the namespace counter. +func (nsLock *nameSpaceLock) Lock() { nsLock.Lock() - nsLock.count++ + nsLock.wcount++ } -func (nsLock nameSpaceLock) Unlock() { +// Unlock releases write lock and decrements the namespace counter. +func (nsLock *nameSpaceLock) Unlock() { nsLock.Unlock() - if nsLock.count != 0 { - nsLock.count-- + if nsLock.wcount != 0 { + nsLock.wcount-- } } -func (nsLock nameSpaceLock) RLock() { +// RLock acquires read lock and increments the namespace counter. +func (nsLock *nameSpaceLock) RLock() { nsLock.RLock() - nsLock.count++ + nsLock.rcount++ } -func (nsLock nameSpaceLock) RUnlock() { +// RUnlock release read lock and decrements the namespace counter. +func (nsLock *nameSpaceLock) RUnlock() { nsLock.RUnlock() - if nsLock.count != 0 { - nsLock.count-- + if nsLock.rcount != 0 { + nsLock.rcount-- } } -func newNameSpaceLock() nameSpaceLock { - return nameSpaceLock{rwMutex: &sync.RWMutex{}, count: 0} +// newNSLock - provides a new instance of namespace locking primitives. +func newNSLock() *nameSpaceLock { + return &nameSpaceLock{ + rwMutex: &sync.RWMutex{}, + rcount: 0, + wcount: 0, + } } diff --git a/xl-v1-readfile.go b/xl-v1-readfile.go index 54a611db1..650f5c011 100644 --- a/xl-v1-readfile.go +++ b/xl-v1-readfile.go @@ -133,17 +133,19 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) return nil, errInvalidArgument } - xl.lockNameSpace(volume, path, true) - defer xl.unlockNameSpace(volume, path, true) + // Acquire a read lock. + readLock := true + xl.lockNS(volume, path, readLock) + defer xl.unlockNS(volume, path, readLock) - // check read quorum + // Check read quorum. quorumDisks := xl.getReadFileQuorumDisks(volume, path) if len(quorumDisks) < xl.readQuorum { - return nil, errors.New("I/O error. do not meet read quorum") + return nil, errReadQuorum } - // get file size - size, err := xl.getFileSize(volume, path, quorumDisks[0].disk) + // Get file size. + fileSize, err := xl.getFileSize(volume, path, quorumDisks[0].disk) if err != nil { return nil, err } @@ -174,7 +176,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) // Initialize pipe. pipeReader, pipeWriter := io.Pipe() go func() { - var totalLeft = size + var totalLeft = fileSize // Read until the totalLeft. for totalLeft > 0 { // Figure out the right blockSize as it was encoded before. @@ -210,6 +212,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) pipeWriter.CloseWithError(err) return } + // Verify the blocks. var ok bool ok, err = xl.ReedSolomon.Verify(enBlocks) @@ -217,6 +220,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) pipeWriter.CloseWithError(err) return } + // Verification failed, blocks require reconstruction. if !ok { err = xl.ReedSolomon.Reconstruct(enBlocks) @@ -237,15 +241,18 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) return } } + // Join the decoded blocks. err = xl.ReedSolomon.Join(pipeWriter, enBlocks, curBlockSize) if err != nil { pipeWriter.CloseWithError(err) return } + // Save what's left after reading erasureBlockSize. totalLeft = totalLeft - erasureBlockSize } + // Cleanly end the pipe after a successful decoding. pipeWriter.Close() diff --git a/xl-v1.go b/xl-v1.go index 3025f6fb5..fdd248ccf 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -45,23 +45,25 @@ type XL struct { DataBlocks int ParityBlocks int storageDisks []StorageAPI - nameSpaceLockMap map[nameSpaceParam]nameSpaceLock + nameSpaceLockMap map[nameSpaceParam]*nameSpaceLock nameSpaceLockMapMutex *sync.Mutex readQuorum int writeQuorum int } -func (xl XL) lockNameSpace(volume, path string, readOnly bool) { +// lockNS - locks the given resource, using a previously allocated +// name space lock or initializing a new one. +func (xl XL) lockNS(volume, path string, readLock bool) { xl.nameSpaceLockMapMutex.Lock() defer xl.nameSpaceLockMapMutex.Unlock() param := nameSpaceParam{volume, path} nsLock, found := xl.nameSpaceLockMap[param] if !found { - nsLock = newNameSpaceLock() + nsLock = newNSLock() } - if readOnly { + if readLock { nsLock.RLock() } else { nsLock.Lock() @@ -70,13 +72,14 @@ func (xl XL) lockNameSpace(volume, path string, readOnly bool) { xl.nameSpaceLockMap[param] = nsLock } -func (xl XL) unlockNameSpace(volume, path string, readOnly bool) { +// unlockNS - unlocks any previously acquired read or write locks. +func (xl XL) unlockNS(volume, path string, readLock bool) { xl.nameSpaceLockMapMutex.Lock() defer xl.nameSpaceLockMapMutex.Unlock() param := nameSpaceParam{volume, path} if nsLock, found := xl.nameSpaceLockMap[param]; found { - if readOnly { + if readLock { nsLock.RUnlock() } else { nsLock.Unlock() @@ -136,10 +139,17 @@ func newXL(disks ...string) (StorageAPI, error) { // Save all the initialized storage disks. xl.storageDisks = storageDisks - xl.nameSpaceLockMap = make(map[nameSpaceParam]nameSpaceLock) + // Initialize name space lock map. + xl.nameSpaceLockMap = make(map[nameSpaceParam]*nameSpaceLock) xl.nameSpaceLockMapMutex = &sync.Mutex{} - xl.readQuorum = len(xl.storageDisks) / 2 - xl.writeQuorum = xl.readQuorum + 3 + + // Figure out read and write quorum based on number of storage disks. + // Read quorum should be always N/2 + 1 (due to Vandermonde matrix + // erasure requirements) + xl.readQuorum = len(xl.storageDisks)/2 + 1 + // Write quorum is assumed if we have total disks + 3 + // parity. (Need to discuss this again) + xl.writeQuorum = len(xl.storageDisks)/2 + 3 if xl.writeQuorum > len(xl.storageDisks) { xl.writeQuorum = len(xl.storageDisks) } @@ -432,13 +442,6 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) { }, nil } -// Delete all path. -func deletePathAll(volume, path string, disks ...StorageAPI) { - for _, disk := range disks { - disk.DeleteFile(volume, path) - } -} - // DeleteFile - delete a file func (xl XL) DeleteFile(volume, path string) error { if !isValidVolname(volume) {