From 42254b5c4d2d2270af1bcab3fa38f8cee219294a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 25 Apr 2016 16:00:22 -0700 Subject: [PATCH] xl: Rename blockingWriteCloser to waitCloser. (#1376) --- network-fs.go | 2 +- xl-v1-createfile.go | 25 +++++++++----- ...1-blockingwriter.go => xl-v1-waitcloser.go | 34 ++++++++++--------- 3 files changed, 35 insertions(+), 26 deletions(-) rename xl-v1-blockingwriter.go => xl-v1-waitcloser.go (60%) diff --git a/network-fs.go b/network-fs.go index 5c0fff73c..855794e50 100644 --- a/network-fs.go +++ b/network-fs.go @@ -184,7 +184,7 @@ func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser, log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("CreateFile http POST failed to upload the data with error %s", err) + }).Debugf("CreateFile HTTP POST failed to upload data with error %s", err) readCloser.CloseWithError(err) return } diff --git a/xl-v1-createfile.go b/xl-v1-createfile.go index 003a2bb15..c2b38e47d 100644 --- a/xl-v1-createfile.go +++ b/xl-v1-createfile.go @@ -38,14 +38,21 @@ const erasureBlockSize = 4 * 1024 * 1024 // 4MiB. func (xl XL) cleanupCreateFileOps(volume, path string, writers ...io.WriteCloser) { closeAndRemoveWriters(writers...) for _, disk := range xl.storageDisks { - disk.DeleteFile(volume, path) + if err := disk.DeleteFile(volume, path); err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Errorf("DeleteFile failed with %s", err) + } } } // Close and remove writers if they are safeFile. func closeAndRemoveWriters(writers ...io.WriteCloser) { for _, writer := range writers { - safeCloseAndRemove(writer) + if err := safeCloseAndRemove(writer); err != nil { + log.Errorf("Closing writer failed with %s", err) + } } } @@ -128,9 +135,9 @@ func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 { // WriteErasure reads predefined blocks, encodes them and writes to // configured storage disks. -func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, bwriter *blockingWriteCloser) { +func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *waitCloser) { // Release the block writer upon function return. - defer bwriter.Release() + defer wcloser.release() // Get available quorum for existing file path. _, higherVersion := xl.getQuorumDisks(volume, path) @@ -335,12 +342,12 @@ func (xl XL) CreateFile(volume, path string) (writeCloser io.WriteCloser, err er // Initialize pipe for data pipe line. pipeReader, pipeWriter := io.Pipe() - // Initialize a new blocking writer closer. - blockingWriter := newBlockingWriteCloser(pipeWriter) + // Initialize a new wait closer, implements both Write and Close. + wcloser := newWaitCloser(pipeWriter) // Start erasure encoding in routine, reading data block by block from pipeReader. - go xl.writeErasure(volume, path, pipeReader, blockingWriter) + go xl.writeErasure(volume, path, pipeReader, wcloser) - // Return the blocking writer, caller should start writing to this. - return blockingWriter, nil + // Return the writer, caller should start writing to this. + return wcloser, nil } diff --git a/xl-v1-blockingwriter.go b/xl-v1-waitcloser.go similarity index 60% rename from xl-v1-blockingwriter.go rename to xl-v1-waitcloser.go index d5b4097ab..4af554378 100644 --- a/xl-v1-blockingwriter.go +++ b/xl-v1-waitcloser.go @@ -21,42 +21,44 @@ import ( "sync" ) -// blockingWriteCloser is a WriteCloser that blocks until released. -type blockingWriteCloser struct { - writer io.WriteCloser // Embedded writer. - release *sync.WaitGroup // Waitgroup for atomicity. - err error +// waitCloser implements a Closer that blocks until released, this +// prevents the writer closing one end of a pipe while making sure +// that all data is written and committed to disk on the end. +// Additionally this also implements Write(). +type waitCloser struct { + wg *sync.WaitGroup // Waitgroup for atomicity. + writer io.WriteCloser // Embedded writer. } // Write to the underlying writer. -func (b *blockingWriteCloser) Write(data []byte) (int, error) { +func (b *waitCloser) Write(data []byte) (int, error) { return b.writer.Write(data) } // Close blocks until another goroutine calls Release(error). Returns // error code if either writer fails or Release is called with an error. -func (b *blockingWriteCloser) Close() error { +func (b *waitCloser) Close() error { err := b.writer.Close() - b.release.Wait() + b.wg.Wait() return err } -// Release the Close, causing it to unblock. Only call this +// release the Close, causing it to unblock. Only call this // once. Calling it multiple times results in a panic. -func (b *blockingWriteCloser) Release() { - b.release.Done() +func (b *waitCloser) release() { + b.wg.Done() return } -// newBlockingWriteCloser Creates a new write closer that must be +// newWaitCloser creates a new write closer that must be // released by the read consumer. -func newBlockingWriteCloser(writer io.WriteCloser) *blockingWriteCloser { +func newWaitCloser(writer io.WriteCloser) *waitCloser { // Wait group for the go-routine. wg := &sync.WaitGroup{} // Add to the wait group to wait for. wg.Add(1) - return &blockingWriteCloser{ - writer: writer, - release: wg, + return &waitCloser{ + wg: wg, + writer: writer, } }