From 29838bb8510e4213e1427c382b1eb125060eed7f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 10 Jul 2015 14:04:37 -0700 Subject: [PATCH] Cleanup temporary writers upon errors during putObject(), all metadata() write operations --- pkg/donut/bucket.go | 32 ++++++++++++++++++++------------ pkg/donut/common.go | 9 +++++++++ pkg/donut/donut-v1.go | 23 ++++++++++++++++------- pkg/donut/donut-v2.go | 2 ++ pkg/donut/multipart.go | 2 ++ 5 files changed, 49 insertions(+), 19 deletions(-) diff --git a/pkg/donut/bucket.go b/pkg/donut/bucket.go index 0cf0d0f49..6c9e0b8c8 100644 --- a/pkg/donut/bucket.go +++ b/pkg/donut/bucket.go @@ -35,7 +35,6 @@ import ( "github.com/minio/minio/pkg/crypto/sha512" "github.com/minio/minio/pkg/donut/split" "github.com/minio/minio/pkg/iodine" - "github.com/minio/minio/pkg/utils/atomic" ) const ( @@ -228,7 +227,7 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5 if objectName == "" || objectData == nil { return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil) } - writers, err := b.getWriters(normalizeObjectName(objectName), "data") + writers, err := b.getObjectWriters(normalizeObjectName(objectName), "data") if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } @@ -244,6 +243,7 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5 mw := io.MultiWriter(writers[0], sumMD5, sum256, sum512) totalLength, err := io.Copy(mw, objectData) if err != nil { + CleanupWritersOnError(writers) return ObjectMetadata{}, iodine.New(err, nil) } objMetadata.Size = totalLength @@ -251,11 +251,13 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5 // calculate data and parity dictated by total number of writers k, m, err := b.getDataAndParity(len(writers)) if err != nil { + CleanupWritersOnError(writers) return ObjectMetadata{}, iodine.New(err, nil) } // write encoded data with k, m and writers chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, sumMD5, sum256, sum512) if err != nil { + CleanupWritersOnError(writers) return ObjectMetadata{}, iodine.New(err, nil) } /// donutMetadata section @@ -273,13 +275,15 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5 if signature != nil { ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sum256.Sum(nil))) if err != nil { + // error occurred while doing signature calculation, we return and also cleanup any temporary writers. + CleanupWritersOnError(writers) return ObjectMetadata{}, iodine.New(err, nil) } if !ok { // purge all writers, when control flow reaches here - for _, writer := range writers { - writer.(*atomic.File).CloseAndPurge() - } + // + // Signature mismatch occurred all temp files to be removed and all data purged. + CleanupWritersOnError(writers) return ObjectMetadata{}, iodine.New(SignatureDoesNotMatch{}, nil) } } @@ -295,6 +299,8 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5 objMetadata.Metadata = metadata // write object specific metadata if err := b.writeObjectMetadata(normalizeObjectName(objectName), objMetadata); err != nil { + // purge all writers, when control flow reaches here + CleanupWritersOnError(writers) return ObjectMetadata{}, iodine.New(err, nil) } // close all writers, when control flow reaches here @@ -328,13 +334,15 @@ func (b bucket) writeObjectMetadata(objectName string, objMetadata ObjectMetadat if objMetadata.Object == "" { return iodine.New(InvalidArgument{}, nil) } - objMetadataWriters, err := b.getWriters(objectName, objectMetadataConfig) + objMetadataWriters, err := b.getObjectWriters(objectName, objectMetadataConfig) if err != nil { return iodine.New(err, nil) } for _, objMetadataWriter := range objMetadataWriters { jenc := json.NewEncoder(objMetadataWriter) if err := jenc.Encode(&objMetadata); err != nil { + // Close writers and purge all temporary entries + CleanupWritersOnError(objMetadataWriters) return iodine.New(err, nil) } } @@ -350,7 +358,7 @@ func (b bucket) readObjectMetadata(objectName string) (ObjectMetadata, error) { if objectName == "" { return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil) } - objMetadataReaders, err := b.getReaders(objectName, objectMetadataConfig) + objMetadataReaders, err := b.getObjectReaders(objectName, objectMetadataConfig) if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } @@ -426,7 +434,7 @@ func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData // readObjectData - func (b bucket) readObjectData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) { - readers, err := b.getReaders(objectName, "data") + readers, err := b.getObjectReaders(objectName, "data") if err != nil { writer.CloseWithError(iodine.New(err, nil)) return @@ -510,8 +518,8 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadC return decodedData, nil } -// getReaders - -func (b bucket) getReaders(objectName, objectMeta string) ([]io.ReadCloser, error) { +// getObjectReaders - +func (b bucket) getObjectReaders(objectName, objectMeta string) ([]io.ReadCloser, error) { var readers []io.ReadCloser nodeSlice := 0 for _, node := range b.nodes { @@ -534,8 +542,8 @@ func (b bucket) getReaders(objectName, objectMeta string) ([]io.ReadCloser, erro return readers, nil } -// getWriters - -func (b bucket) getWriters(objectName, objectMeta string) ([]io.WriteCloser, error) { +// getObjectWriters - +func (b bucket) getObjectWriters(objectName, objectMeta string) ([]io.WriteCloser, error) { var writers []io.WriteCloser nodeSlice := 0 for _, node := range b.nodes { diff --git a/pkg/donut/common.go b/pkg/donut/common.go index 9f71665de..92efa3de7 100644 --- a/pkg/donut/common.go +++ b/pkg/donut/common.go @@ -24,6 +24,8 @@ import ( "sort" "strings" "unicode/utf8" + + "github.com/minio/minio/pkg/utils/atomic" ) // IsValidBucket - verify bucket name in accordance with @@ -163,3 +165,10 @@ func SortU(objects []string) []string { sort.Strings(results) return results } + +// CleanupWritersOnError purge writers on error +func CleanupWritersOnError(writers []io.WriteCloser) { + for _, writer := range writers { + writer.(*atomic.File).CloseAndPurge() + } +} diff --git a/pkg/donut/donut-v1.go b/pkg/donut/donut-v1.go index 10b622dd7..8c36b0e97 100644 --- a/pkg/donut/donut-v1.go +++ b/pkg/donut/donut-v1.go @@ -42,6 +42,8 @@ const ( bucketMetadataVersion = "1.0.0" ) +/// v1 API functions + // makeBucket - make a new bucket func (donut API) makeBucket(bucket string, acl BucketACL) error { if bucket == "" || strings.TrimSpace(bucket) == "" { @@ -204,7 +206,9 @@ func (donut API) getObjectMetadata(bucket, object string) (ObjectMetadata, error return objectMetadata, nil } -// getDiskWriters - +//// internal functions + +// getBucketMetadataWriters - func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) { var writers []io.WriteCloser for _, node := range donut.nodes { @@ -213,8 +217,8 @@ func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) { return nil, iodine.New(err, nil) } writers = make([]io.WriteCloser, len(disks)) - for order, d := range disks { - bucketMetaDataWriter, err := d.CreateFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig)) + for order, dd := range disks { + bucketMetaDataWriter, err := dd.CreateFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig)) if err != nil { return nil, iodine.New(err, nil) } @@ -224,6 +228,7 @@ func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) { return writers, nil } +// getBucketMetadataReaders - func (donut API) getBucketMetadataReaders() ([]io.ReadCloser, error) { var readers []io.ReadCloser for _, node := range donut.nodes { @@ -243,24 +248,26 @@ func (donut API) getBucketMetadataReaders() ([]io.ReadCloser, error) { return readers, nil } -// +// setDonutBucketMetadata - func (donut API) setDonutBucketMetadata(metadata *AllBuckets) error { writers, err := donut.getBucketMetadataWriters() if err != nil { return iodine.New(err, nil) } - for _, writer := range writers { - defer writer.Close() - } for _, writer := range writers { jenc := json.NewEncoder(writer) if err := jenc.Encode(metadata); err != nil { + CleanupWritersOnError(writers) return iodine.New(err, nil) } } + for _, writer := range writers { + writer.Close() + } return nil } +// getDonutBucketMetadata - func (donut API) getDonutBucketMetadata() (*AllBuckets, error) { metadata := new(AllBuckets) readers, err := donut.getBucketMetadataReaders() @@ -280,6 +287,7 @@ func (donut API) getDonutBucketMetadata() (*AllBuckets, error) { return nil, iodine.New(InvalidArgument{}, nil) } +// makeDonutBucket - func (donut API) makeDonutBucket(bucketName, acl string) error { if err := donut.listDonutBuckets(); err != nil { return iodine.New(err, nil) @@ -329,6 +337,7 @@ func (donut API) makeDonutBucket(bucketName, acl string) error { return nil } +// listDonutBuckets - func (donut API) listDonutBuckets() error { for _, node := range donut.nodes { disks, err := node.ListDisks() diff --git a/pkg/donut/donut-v2.go b/pkg/donut/donut-v2.go index d976c1177..4a81e9ebf 100644 --- a/pkg/donut/donut-v2.go +++ b/pkg/donut/donut-v2.go @@ -123,6 +123,8 @@ func New() (Interface, error) { return a, nil } +/// V2 API functions + // GetObject - GET object from cache buffer func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) { donut.lock.Lock() diff --git a/pkg/donut/multipart.go b/pkg/donut/multipart.go index 25aeb8693..a27e0d2c4 100644 --- a/pkg/donut/multipart.go +++ b/pkg/donut/multipart.go @@ -38,6 +38,8 @@ import ( "github.com/minio/minio/pkg/iodine" ) +/// V2 API functions + // NewMultipartUpload - initiate a new multipart session func (donut API) NewMultipartUpload(bucket, key, contentType string, signature *Signature) (string, error) { donut.lock.Lock()