Merge pull request #732 from harshavardhana/pr_out_cleanup_temporary_writers_upon_errors_during_putobject_all_metadata_write_operations

Cleanup temporary writers upon errors during putObject(), all metadata() write operations
master
Harshavardhana 10 years ago
commit d0eb4a2aea
  1. 32
      pkg/donut/bucket.go
  2. 9
      pkg/donut/common.go
  3. 23
      pkg/donut/donut-v1.go
  4. 2
      pkg/donut/donut-v2.go
  5. 2
      pkg/donut/multipart.go

@ -35,7 +35,6 @@ import (
"github.com/minio/minio/pkg/crypto/sha512" "github.com/minio/minio/pkg/crypto/sha512"
"github.com/minio/minio/pkg/donut/split" "github.com/minio/minio/pkg/donut/split"
"github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/utils/atomic"
) )
const ( const (
@ -228,7 +227,7 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
if objectName == "" || objectData == nil { if objectName == "" || objectData == nil {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil) return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
} }
writers, err := b.getWriters(normalizeObjectName(objectName), "data") writers, err := b.getObjectWriters(normalizeObjectName(objectName), "data")
if err != nil { if err != nil {
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
@ -244,6 +243,7 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
mw := io.MultiWriter(writers[0], sumMD5, sum256, sum512) mw := io.MultiWriter(writers[0], sumMD5, sum256, sum512)
totalLength, err := io.Copy(mw, objectData) totalLength, err := io.Copy(mw, objectData)
if err != nil { if err != nil {
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
objMetadata.Size = totalLength objMetadata.Size = totalLength
@ -251,11 +251,13 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
// calculate data and parity dictated by total number of writers // calculate data and parity dictated by total number of writers
k, m, err := b.getDataAndParity(len(writers)) k, m, err := b.getDataAndParity(len(writers))
if err != nil { if err != nil {
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
// write encoded data with k, m and writers // write encoded data with k, m and writers
chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, sumMD5, sum256, sum512) chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, sumMD5, sum256, sum512)
if err != nil { if err != nil {
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
/// donutMetadata section /// donutMetadata section
@ -273,13 +275,15 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
if signature != nil { if signature != nil {
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sum256.Sum(nil))) ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sum256.Sum(nil)))
if err != nil { if err != nil {
// error occurred while doing signature calculation, we return and also cleanup any temporary writers.
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
if !ok { if !ok {
// purge all writers, when control flow reaches here // purge all writers, when control flow reaches here
for _, writer := range writers { //
writer.(*atomic.File).CloseAndPurge() // Signature mismatch occurred all temp files to be removed and all data purged.
} CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(SignatureDoesNotMatch{}, nil) return ObjectMetadata{}, iodine.New(SignatureDoesNotMatch{}, nil)
} }
} }
@ -295,6 +299,8 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
objMetadata.Metadata = metadata objMetadata.Metadata = metadata
// write object specific metadata // write object specific metadata
if err := b.writeObjectMetadata(normalizeObjectName(objectName), objMetadata); err != nil { if err := b.writeObjectMetadata(normalizeObjectName(objectName), objMetadata); err != nil {
// purge all writers, when control flow reaches here
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
// close all writers, when control flow reaches here // close all writers, when control flow reaches here
@ -328,13 +334,15 @@ func (b bucket) writeObjectMetadata(objectName string, objMetadata ObjectMetadat
if objMetadata.Object == "" { if objMetadata.Object == "" {
return iodine.New(InvalidArgument{}, nil) return iodine.New(InvalidArgument{}, nil)
} }
objMetadataWriters, err := b.getWriters(objectName, objectMetadataConfig) objMetadataWriters, err := b.getObjectWriters(objectName, objectMetadataConfig)
if err != nil { if err != nil {
return iodine.New(err, nil) return iodine.New(err, nil)
} }
for _, objMetadataWriter := range objMetadataWriters { for _, objMetadataWriter := range objMetadataWriters {
jenc := json.NewEncoder(objMetadataWriter) jenc := json.NewEncoder(objMetadataWriter)
if err := jenc.Encode(&objMetadata); err != nil { if err := jenc.Encode(&objMetadata); err != nil {
// Close writers and purge all temporary entries
CleanupWritersOnError(objMetadataWriters)
return iodine.New(err, nil) return iodine.New(err, nil)
} }
} }
@ -350,7 +358,7 @@ func (b bucket) readObjectMetadata(objectName string) (ObjectMetadata, error) {
if objectName == "" { if objectName == "" {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil) return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
} }
objMetadataReaders, err := b.getReaders(objectName, objectMetadataConfig) objMetadataReaders, err := b.getObjectReaders(objectName, objectMetadataConfig)
if err != nil { if err != nil {
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
@ -426,7 +434,7 @@ func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData
// readObjectData - // readObjectData -
func (b bucket) readObjectData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) { func (b bucket) readObjectData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) {
readers, err := b.getReaders(objectName, "data") readers, err := b.getObjectReaders(objectName, "data")
if err != nil { if err != nil {
writer.CloseWithError(iodine.New(err, nil)) writer.CloseWithError(iodine.New(err, nil))
return return
@ -510,8 +518,8 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadC
return decodedData, nil return decodedData, nil
} }
// getReaders - // getObjectReaders -
func (b bucket) getReaders(objectName, objectMeta string) ([]io.ReadCloser, error) { func (b bucket) getObjectReaders(objectName, objectMeta string) ([]io.ReadCloser, error) {
var readers []io.ReadCloser var readers []io.ReadCloser
nodeSlice := 0 nodeSlice := 0
for _, node := range b.nodes { for _, node := range b.nodes {
@ -534,8 +542,8 @@ func (b bucket) getReaders(objectName, objectMeta string) ([]io.ReadCloser, erro
return readers, nil return readers, nil
} }
// getWriters - // getObjectWriters -
func (b bucket) getWriters(objectName, objectMeta string) ([]io.WriteCloser, error) { func (b bucket) getObjectWriters(objectName, objectMeta string) ([]io.WriteCloser, error) {
var writers []io.WriteCloser var writers []io.WriteCloser
nodeSlice := 0 nodeSlice := 0
for _, node := range b.nodes { for _, node := range b.nodes {

@ -24,6 +24,8 @@ import (
"sort" "sort"
"strings" "strings"
"unicode/utf8" "unicode/utf8"
"github.com/minio/minio/pkg/utils/atomic"
) )
// IsValidBucket - verify bucket name in accordance with // IsValidBucket - verify bucket name in accordance with
@ -163,3 +165,10 @@ func SortU(objects []string) []string {
sort.Strings(results) sort.Strings(results)
return results return results
} }
// CleanupWritersOnError purge writers on error
func CleanupWritersOnError(writers []io.WriteCloser) {
for _, writer := range writers {
writer.(*atomic.File).CloseAndPurge()
}
}

@ -42,6 +42,8 @@ const (
bucketMetadataVersion = "1.0.0" bucketMetadataVersion = "1.0.0"
) )
/// v1 API functions
// makeBucket - make a new bucket // makeBucket - make a new bucket
func (donut API) makeBucket(bucket string, acl BucketACL) error { func (donut API) makeBucket(bucket string, acl BucketACL) error {
if bucket == "" || strings.TrimSpace(bucket) == "" { if bucket == "" || strings.TrimSpace(bucket) == "" {
@ -204,7 +206,9 @@ func (donut API) getObjectMetadata(bucket, object string) (ObjectMetadata, error
return objectMetadata, nil return objectMetadata, nil
} }
// getDiskWriters - //// internal functions
// getBucketMetadataWriters -
func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) { func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) {
var writers []io.WriteCloser var writers []io.WriteCloser
for _, node := range donut.nodes { for _, node := range donut.nodes {
@ -213,8 +217,8 @@ func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) {
return nil, iodine.New(err, nil) return nil, iodine.New(err, nil)
} }
writers = make([]io.WriteCloser, len(disks)) writers = make([]io.WriteCloser, len(disks))
for order, d := range disks { for order, dd := range disks {
bucketMetaDataWriter, err := d.CreateFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig)) bucketMetaDataWriter, err := dd.CreateFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
if err != nil { if err != nil {
return nil, iodine.New(err, nil) return nil, iodine.New(err, nil)
} }
@ -224,6 +228,7 @@ func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) {
return writers, nil return writers, nil
} }
// getBucketMetadataReaders -
func (donut API) getBucketMetadataReaders() ([]io.ReadCloser, error) { func (donut API) getBucketMetadataReaders() ([]io.ReadCloser, error) {
var readers []io.ReadCloser var readers []io.ReadCloser
for _, node := range donut.nodes { for _, node := range donut.nodes {
@ -243,24 +248,26 @@ func (donut API) getBucketMetadataReaders() ([]io.ReadCloser, error) {
return readers, nil return readers, nil
} }
// // setDonutBucketMetadata -
func (donut API) setDonutBucketMetadata(metadata *AllBuckets) error { func (donut API) setDonutBucketMetadata(metadata *AllBuckets) error {
writers, err := donut.getBucketMetadataWriters() writers, err := donut.getBucketMetadataWriters()
if err != nil { if err != nil {
return iodine.New(err, nil) return iodine.New(err, nil)
} }
for _, writer := range writers {
defer writer.Close()
}
for _, writer := range writers { for _, writer := range writers {
jenc := json.NewEncoder(writer) jenc := json.NewEncoder(writer)
if err := jenc.Encode(metadata); err != nil { if err := jenc.Encode(metadata); err != nil {
CleanupWritersOnError(writers)
return iodine.New(err, nil) return iodine.New(err, nil)
} }
} }
for _, writer := range writers {
writer.Close()
}
return nil return nil
} }
// getDonutBucketMetadata -
func (donut API) getDonutBucketMetadata() (*AllBuckets, error) { func (donut API) getDonutBucketMetadata() (*AllBuckets, error) {
metadata := new(AllBuckets) metadata := new(AllBuckets)
readers, err := donut.getBucketMetadataReaders() readers, err := donut.getBucketMetadataReaders()
@ -280,6 +287,7 @@ func (donut API) getDonutBucketMetadata() (*AllBuckets, error) {
return nil, iodine.New(InvalidArgument{}, nil) return nil, iodine.New(InvalidArgument{}, nil)
} }
// makeDonutBucket -
func (donut API) makeDonutBucket(bucketName, acl string) error { func (donut API) makeDonutBucket(bucketName, acl string) error {
if err := donut.listDonutBuckets(); err != nil { if err := donut.listDonutBuckets(); err != nil {
return iodine.New(err, nil) return iodine.New(err, nil)
@ -329,6 +337,7 @@ func (donut API) makeDonutBucket(bucketName, acl string) error {
return nil return nil
} }
// listDonutBuckets -
func (donut API) listDonutBuckets() error { func (donut API) listDonutBuckets() error {
for _, node := range donut.nodes { for _, node := range donut.nodes {
disks, err := node.ListDisks() disks, err := node.ListDisks()

@ -123,6 +123,8 @@ func New() (Interface, error) {
return a, nil return a, nil
} }
/// V2 API functions
// GetObject - GET object from cache buffer // GetObject - GET object from cache buffer
func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) { func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) {
donut.lock.Lock() donut.lock.Lock()

@ -38,6 +38,8 @@ import (
"github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/iodine"
) )
/// V2 API functions
// NewMultipartUpload - initiate a new multipart session // NewMultipartUpload - initiate a new multipart session
func (donut API) NewMultipartUpload(bucket, key, contentType string, signature *Signature) (string, error) { func (donut API) NewMultipartUpload(bucket, key, contentType string, signature *Signature) (string, error) {
donut.lock.Lock() donut.lock.Lock()

Loading…
Cancel
Save