From 4e6e78598fe636ee5b72aba1c4e902f3c4f79ec5 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 6 Feb 2016 01:36:43 -0800 Subject: [PATCH] multipart: Increase locked critical for CompleteMultipart. --- pkg/fs/fs-bucket.go | 48 ++++++++++++++++++++++++++---------------- pkg/fs/fs-multipart.go | 16 ++++++++++++-- pkg/fs/fs-object.go | 43 +++++++++++++++++++++++++------------ 3 files changed, 73 insertions(+), 34 deletions(-) diff --git a/pkg/fs/fs-bucket.go b/pkg/fs/fs-bucket.go index 314fd8778..ffcdb7d3e 100644 --- a/pkg/fs/fs-bucket.go +++ b/pkg/fs/fs-bucket.go @@ -30,15 +30,15 @@ import ( /// Bucket Operations -// DeleteBucket - delete bucket +// DeleteBucket - delete a bucket. func (fs Filesystem) DeleteBucket(bucket string) *probe.Error { - // verify bucket path legal + // Verify bucket is valid. if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) } bucket = fs.denormalizeBucket(bucket) bucketDir := filepath.Join(fs.path, bucket) - // check bucket exists + // Check if bucket exists. if _, e := os.Stat(bucketDir); e != nil { if os.IsNotExist(e) { return probe.NewError(BucketNotFound{Bucket: bucket}) @@ -57,12 +57,15 @@ func (fs Filesystem) DeleteBucket(bucket string) *probe.Error { } return probe.NewError(e) } + + // Critical region hold write lock. fs.rwLock.Lock() delete(fs.buckets.Metadata, bucket) - fs.rwLock.Unlock() if err := saveBucketsMetadata(*fs.buckets); err != nil { + fs.rwLock.Unlock() return err.Trace(bucket) } + fs.rwLock.Unlock() return nil } @@ -73,21 +76,20 @@ func (fs Filesystem) ListBuckets() ([]BucketMetadata, *probe.Error) { return []BucketMetadata{}, probe.NewError(err) } if err == io.EOF { - // This message is printed if there are more than 1000 buckets. + // This message is printed if there are more than 1000 buckets + // and we saw io.EOF. fmt.Printf("More buckets found, truncating the bucket list to %d entries only.", fs.maxBuckets) } var metadataList []BucketMetadata for _, file := range files { if !file.IsDir() { - // if files found ignore them + // If not directory, ignore all file types. continue } + // If directories are found with odd names, skip them. dirName := strings.ToLower(file.Name()) - if file.IsDir() { - // If directories found with odd names, skip them. - if !IsValidBucketName(dirName) { - continue - } + if file.IsDir() && !IsValidBucketName(dirName) { + continue } metadata := BucketMetadata{ Name: dirName, @@ -127,19 +129,18 @@ func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error { return probe.NewError(RootPathFull{Path: fs.path}) } - // Verify if bucket path legal. + // Verify if bucket is valid. if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) } - // Verify if bucket acl is legal. + // Verify if bucket acl is valid. if !IsValidBucketACL(acl) { return probe.NewError(InvalidACL{ACL: acl}) } - bucket = fs.denormalizeBucket(bucket) - // Get bucket path. + bucket = fs.denormalizeBucket(bucket) bucketDir := filepath.Join(fs.path, bucket) if _, e := os.Stat(bucketDir); e == nil { return probe.NewError(BucketExists{Bucket: bucket}) @@ -150,7 +151,6 @@ func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error { return probe.NewError(err) } - bucketMetadata := &BucketMetadata{} fi, e := os.Stat(bucketDir) // Check if bucket exists. if e != nil { @@ -162,15 +162,21 @@ func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error { if strings.TrimSpace(acl) == "" { acl = "private" } + + // Get a new bucket name metadata. + bucketMetadata := &BucketMetadata{} bucketMetadata.Name = fi.Name() bucketMetadata.Created = fi.ModTime() bucketMetadata.ACL = BucketACL(acl) + + // Critical region hold a write lock. fs.rwLock.Lock() fs.buckets.Metadata[bucket] = bucketMetadata - fs.rwLock.Unlock() if err := saveBucketsMetadata(*fs.buckets); err != nil { + fs.rwLock.Unlock() return err.Trace(bucket) } + fs.rwLock.Unlock() return nil } @@ -212,6 +218,7 @@ func (fs Filesystem) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Er fs.rwLock.RLock() bucketMetadata, ok := fs.buckets.Metadata[bucket] fs.rwLock.RUnlock() + // If metadata value is not found, get it from disk. if !ok { bucketMetadata = &BucketMetadata{} bucketMetadata.Name = fi.Name() @@ -245,6 +252,8 @@ func (fs Filesystem) SetBucketMetadata(bucket string, metadata map[string]string } return probe.NewError(e) } + + // Critical region handle read lock. fs.rwLock.RLock() bucketMetadata, ok := fs.buckets.Metadata[bucket] fs.rwLock.RUnlock() @@ -254,11 +263,14 @@ func (fs Filesystem) SetBucketMetadata(bucket string, metadata map[string]string bucketMetadata.Created = fi.ModTime() } bucketMetadata.ACL = BucketACL(acl) + + // Critical region handle write lock. fs.rwLock.Lock() fs.buckets.Metadata[bucket] = bucketMetadata - fs.rwLock.Unlock() if err := saveBucketsMetadata(*fs.buckets); err != nil { + fs.rwLock.Unlock() return err.Trace(bucket) } + fs.rwLock.Unlock() return nil } diff --git a/pkg/fs/fs-multipart.go b/pkg/fs/fs-multipart.go index 71125dc32..21620267c 100644 --- a/pkg/fs/fs-multipart.go +++ b/pkg/fs/fs-multipart.go @@ -236,6 +236,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] + // Critical region requiring write lock. fs.rwLock.Lock() // Initialize multipart session. mpartSession := &MultipartSession{} @@ -364,6 +365,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s partMetadata.Size = fi.Size() partMetadata.LastModified = fi.ModTime() + // Critical region requiring read lock. fs.rwLock.RLock() deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID] fs.rwLock.RUnlock() @@ -382,6 +384,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s // Sort by part number before saving. sort.Sort(partNumber(deserializedMultipartSession.Parts)) + // Critical region requiring write lock. fs.rwLock.Lock() fs.multiparts.ActiveSession[uploadID] = deserializedMultipartSession if err := saveMultipartsSession(*fs.multiparts); err != nil { @@ -390,6 +393,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s } fs.rwLock.Unlock() + // Return etag. return partMetadata.ETag, nil } @@ -459,6 +463,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da // Save parts for verification. parts := completeMultipartUpload.Part + // Critical region requiring read lock. fs.rwLock.RLock() savedParts := fs.multiparts.ActiveSession[uploadID].Parts fs.rwLock.RUnlock() @@ -468,6 +473,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da return ObjectMetadata{}, probe.NewError(InvalidPart{}) } + // Parts successfully validated, save all the parts. partPathPrefix := objectPath + uploadID if err := saveParts(partPathPrefix, objectWriter, parts); err != nil { file.CloseAndPurge() @@ -476,6 +482,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da // Successfully saved, remove all parts. removeParts(partPathPrefix, savedParts) + // Critical region requiring write lock. fs.rwLock.Lock() delete(fs.multiparts.ActiveSession, uploadID) if err := saveMultipartsSession(*fs.multiparts); err != nil { @@ -483,10 +490,10 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da file.CloseAndPurge() return ObjectMetadata{}, err.Trace(partPathPrefix) } - fs.rwLock.Unlock() - file.Close() + fs.rwLock.Unlock() + // Send stat again to get object metadata. st, e := os.Stat(objectPath) if e != nil { return ObjectMetadata{}, probe.NewError(e) @@ -550,6 +557,7 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso return ObjectResourcesMetadata{}, probe.NewError(e) } + // Critical region requiring read lock. fs.rwLock.RLock() deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID] fs.rwLock.RUnlock() @@ -600,14 +608,18 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob objectPath := filepath.Join(bucketPath, object) partPathPrefix := objectPath + uploadID + + // Critical region requiring read lock. fs.rwLock.RLock() savedParts := fs.multiparts.ActiveSession[uploadID].Parts fs.rwLock.RUnlock() + // Remove all parts. if err := removeParts(partPathPrefix, savedParts); err != nil { return err.Trace(partPathPrefix) } + // Critical region requiring write lock. fs.rwLock.Lock() delete(fs.multiparts.ActiveSession, uploadID) if err := saveMultipartsSession(*fs.multiparts); err != nil { diff --git a/pkg/fs/fs-object.go b/pkg/fs/fs-object.go index b38d37fe0..bd401b214 100644 --- a/pkg/fs/fs-object.go +++ b/pkg/fs/fs-object.go @@ -40,6 +40,7 @@ import ( // GetObject - GET object func (fs Filesystem) GetObject(w io.Writer, bucket, object string, start, length int64) (int64, *probe.Error) { + // Critical region requiring read lock. fs.rwLock.RLock() defer fs.rwLock.RUnlock() @@ -111,7 +112,7 @@ func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, * return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: bucket}) } - // normalize buckets. + // Normalize buckets. bucket = fs.denormalizeBucket(bucket) bucketPath := filepath.Join(fs.path, bucket) if _, e := os.Stat(bucketPath); e != nil { @@ -175,7 +176,10 @@ func getMetadata(rootPath, bucket, object string) (ObjectMetadata, *probe.Error) // isMD5SumEqual - returns error if md5sum mismatches, success its `nil` func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) bool { + // Verify the md5sum. if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" { + // Decode md5sum to bytes from their hexadecimal + // representations. expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum) if err != nil { return false @@ -184,6 +188,7 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) bool { if err != nil { return false } + // Verify md5sum bytes are equal after successful decoding. if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) { return false } @@ -219,6 +224,7 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in } return ObjectMetadata{}, probe.NewError(e) } + // Verify object path legal. if !IsValidObjectName(object) { return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) @@ -252,23 +258,24 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in } } - h := md5.New() - sh := sha256.New() - mw := io.MultiWriter(file, h, sh) + // Instantiate checksum hashers and create a multiwriter. + md5Hasher := md5.New() + sha256Hasher := sha256.New() + objectWriter := io.MultiWriter(file, md5Hasher, sha256Hasher) if size > 0 { - if _, e = io.CopyN(mw, data, size); e != nil { + if _, e = io.CopyN(objectWriter, data, size); e != nil { file.CloseAndPurge() return ObjectMetadata{}, probe.NewError(e) } } else { - if _, e = io.Copy(mw, data); e != nil { + if _, e = io.Copy(objectWriter, data); e != nil { file.CloseAndPurge() return ObjectMetadata{}, probe.NewError(e) } } - md5Sum := hex.EncodeToString(h.Sum(nil)) + md5Sum := hex.EncodeToString(md5Hasher.Sum(nil)) // Verify if the written object is equal to what is expected, only // if it is requested as such. if strings.TrimSpace(expectedMD5Sum) != "" { @@ -277,7 +284,7 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in return ObjectMetadata{}, probe.NewError(BadDigest{MD5: expectedMD5Sum, Bucket: bucket, Object: object}) } } - sha256Sum := hex.EncodeToString(sh.Sum(nil)) + sha256Sum := hex.EncodeToString(sha256Hasher.Sum(nil)) if signature != nil { ok, err := signature.DoesSignatureMatch(sha256Sum) if err != nil { @@ -291,6 +298,7 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in } file.Close() + // Set stat again to get the latest metadata. st, e := os.Stat(objectPath) if e != nil { return ObjectMetadata{}, probe.NewError(e) @@ -318,6 +326,7 @@ func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error if basePath == deletePath { return nil } + // Verify if the path exists. pathSt, e := os.Stat(deletePath) if e != nil { if os.IsNotExist(e) { @@ -326,6 +335,7 @@ func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error return probe.NewError(e) } if pathSt.IsDir() { + // Verify if directory is empty. empty, e := ioutils.IsDirEmpty(deletePath) if e != nil { return probe.NewError(e) @@ -334,25 +344,27 @@ func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error return nil } } + // Attempt to remove path. if e := os.Remove(deletePath); e != nil { return probe.NewError(e) } + // Recursively go down the next path and delete again. if err := deleteObjectPath(basePath, filepath.Dir(deletePath), bucket, object); err != nil { return err.Trace(basePath, deletePath, bucket, object) } return nil } -// DeleteObject - delete and object +// DeleteObject - delete object. func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error { - // check bucket name valid + // Check bucket name valid if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) } bucket = fs.denormalizeBucket(bucket) bucketPath := filepath.Join(fs.path, bucket) - // check bucket exists + // Check bucket exists if _, e := os.Stat(bucketPath); e != nil { if os.IsNotExist(e) { return probe.NewError(BucketNotFound{Bucket: bucket}) @@ -360,19 +372,22 @@ func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error { return probe.NewError(e) } - // verify object path legal + // Verify object path legal if !IsValidObjectName(object) { return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // Do not use filepath.Join() since filepath.Join strips off any object names with '/', use them as is - // in a static manner so that we can send a proper 'ObjectNotFound' reply back upon os.Stat() + // Do not use filepath.Join() since filepath.Join strips off any + // object names with '/', use them as is in a static manner so + // that we can send a proper 'ObjectNotFound' reply back upon + // os.Stat(). var objectPath string if runtime.GOOS == "windows" { objectPath = fs.path + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object } else { objectPath = fs.path + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object } + // Delete object path if its empty. err := deleteObjectPath(bucketPath, objectPath, bucket, object) if err != nil { if os.IsNotExist(err.ToGoError()) {