From 8557cbc9b7af422458171fd8439771828e766bb8 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 4 Feb 2016 20:40:58 -0800 Subject: [PATCH] fs: Add granular locking. --- pkg/fs/acl.go | 12 ++--- pkg/fs/fs-bucket-listobjects.go | 7 +-- pkg/fs/fs-bucket.go | 90 +++++++++++++++++++-------------- pkg/fs/fs-multipart.go | 89 ++++++++++++++++++-------------- pkg/fs/fs-object.go | 61 ++++++++++++---------- pkg/fs/fs.go | 36 +++++-------- pkg/fs/fs_test.go | 3 +- routers.go | 3 +- server-main.go | 18 +++++++ 9 files changed, 178 insertions(+), 141 deletions(-) diff --git a/pkg/fs/acl.go b/pkg/fs/acl.go index b7f38fede..85e63dfaf 100644 --- a/pkg/fs/acl.go +++ b/pkg/fs/acl.go @@ -18,8 +18,8 @@ package fs // IsPrivateBucket - is private bucket func (fs Filesystem) IsPrivateBucket(bucket string) bool { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() bucketMetadata, ok := fs.buckets.Metadata[bucket] if !ok { return true @@ -29,8 +29,8 @@ func (fs Filesystem) IsPrivateBucket(bucket string) bool { // IsPublicBucket - is public bucket func (fs Filesystem) IsPublicBucket(bucket string) bool { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() bucketMetadata, ok := fs.buckets.Metadata[bucket] if !ok { return true @@ -40,8 +40,8 @@ func (fs Filesystem) IsPublicBucket(bucket string) bool { // IsReadOnlyBucket - is read only bucket func (fs Filesystem) IsReadOnlyBucket(bucket string) bool { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() bucketMetadata, ok := fs.buckets.Metadata[bucket] if !ok { return true diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index 576e4d4e1..8269d0fb8 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -271,16 +271,17 @@ func (fs *Filesystem) listObjectsService() *probe.Error { // ListObjects - lists all objects for a given prefix, returns upto // maxKeys number of objects per call. func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.RLock() + defer fs.rwLock.RUnlock() + // Input validation. if !IsValidBucketName(bucket) { return ListObjectsResult{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } bucket = fs.denormalizeBucket(bucket) rootPrefix := filepath.Join(fs.path, bucket) - // check bucket exists + // Check bucket exists. if _, e := os.Stat(rootPrefix); e != nil { if os.IsNotExist(e) { return ListObjectsResult{}, probe.NewError(BucketNotFound{Bucket: bucket}) diff --git a/pkg/fs/fs-bucket.go b/pkg/fs/fs-bucket.go index c42d4c5a2..1352026d3 100644 --- a/pkg/fs/fs-bucket.go +++ b/pkg/fs/fs-bucket.go @@ -32,8 +32,8 @@ import ( // DeleteBucket - delete bucket func (fs Filesystem) DeleteBucket(bucket string) *probe.Error { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() // verify bucket path legal if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) @@ -66,29 +66,18 @@ func (fs Filesystem) DeleteBucket(bucket string) *probe.Error { return nil } -func removeDuplicateBuckets(elements []BucketMetadata) (result []BucketMetadata) { - // Use map to record duplicates as we find them. - duplicates := make(map[string]struct{}) - for _, element := range elements { - if _, ok := duplicates[element.Name]; !ok { - duplicates[element.Name] = struct{}{} - result = append(result, element) - } - } - return result -} - -// ListBuckets - Get service +// ListBuckets - Get service. func (fs Filesystem) ListBuckets() ([]BucketMetadata, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.RLock() + defer fs.rwLock.RUnlock() files, err := ioutils.ReadDirN(fs.path, fs.maxBuckets) if err != nil && err != io.EOF { return []BucketMetadata{}, probe.NewError(err) } if err == io.EOF { - fmt.Printf("Truncating the bucket list to %d entries only.", fs.maxBuckets) + // This message is printed if there are more than 1000 buckets. + fmt.Printf("More buckets found, truncating the bucket list to %d entries only.", fs.maxBuckets) } var metadataList []BucketMetadata for _, file := range files { @@ -98,7 +87,7 @@ func (fs Filesystem) ListBuckets() ([]BucketMetadata, *probe.Error) { } dirName := strings.ToLower(file.Name()) if file.IsDir() { - // if directories found with odd names, skip them too + // If directories found with odd names, skip them. if !IsValidBucketName(dirName) { continue } @@ -109,50 +98,67 @@ func (fs Filesystem) ListBuckets() ([]BucketMetadata, *probe.Error) { } metadataList = append(metadataList, metadata) } + // Remove duplicated entries. metadataList = removeDuplicateBuckets(metadataList) return metadataList, nil } -// MakeBucket - PUT Bucket +// removeDuplicateBuckets - remove duplicate buckets. +func removeDuplicateBuckets(elements []BucketMetadata) (result []BucketMetadata) { + // Use map to record duplicates as we find them. + duplicates := make(map[string]struct{}) + for _, element := range elements { + if _, ok := duplicates[element.Name]; !ok { + duplicates[element.Name] = struct{}{} + result = append(result, element) + } + } + return result +} + +// MakeBucket - PUT Bucket. func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() di, err := disk.GetInfo(fs.path) if err != nil { return probe.NewError(err) } - // Remove 5% from total space for cumulative disk space used for journalling, inodes etc. + // Remove 5% from total space for cumulative disk space used for + // journalling, inodes etc. availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 if int64(availableDiskSpace) <= fs.minFreeDisk { return probe.NewError(RootPathFull{Path: fs.path}) } - // verify bucket path legal + // Verify if bucket path legal. if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) } + // Verify if bucket acl is legal. if !IsValidBucketACL(acl) { return probe.NewError(InvalidACL{ACL: acl}) } bucket = fs.denormalizeBucket(bucket) - // get bucket path + + // Get bucket path. bucketDir := filepath.Join(fs.path, bucket) if _, e := os.Stat(bucketDir); e == nil { return probe.NewError(BucketExists{Bucket: bucket}) } - // make bucket + // Make bucket. if e := os.Mkdir(bucketDir, 0700); e != nil { return probe.NewError(err) } bucketMetadata := &BucketMetadata{} fi, e := os.Stat(bucketDir) - // check if bucket exists + // Check if bucket exists. if e != nil { if os.IsNotExist(e) { return probe.NewError(BucketNotFound{Bucket: bucket}) @@ -172,12 +178,17 @@ func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error { return nil } +// denormalizeBucket - will convert incoming bucket names to +// corresponding valid bucketnames on the backend in a platform +// compatible way for all operating systems. func (fs Filesystem) denormalizeBucket(bucket string) string { - buckets, err := ioutils.ReadDirNamesN(fs.path, fs.maxBuckets) - if err != nil { + buckets, e := ioutils.ReadDirNamesN(fs.path, fs.maxBuckets) + if e != nil { return bucket } for _, b := range buckets { + // Verify if lowercase version of the bucket is equal to the + // incoming bucket, then use the proper name. if strings.ToLower(b) == bucket { return b } @@ -185,21 +196,20 @@ func (fs Filesystem) denormalizeBucket(bucket string) string { return bucket } -// GetBucketMetadata - get bucket metadata +// GetBucketMetadata - get bucket metadata. func (fs Filesystem) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.RLock() + defer fs.rwLock.RUnlock() if !IsValidBucketName(bucket) { return BucketMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } bucket = fs.denormalizeBucket(bucket) - - // get bucket path + // Get bucket path. bucketDir := filepath.Join(fs.path, bucket) fi, e := os.Stat(bucketDir) if e != nil { - // check if bucket exists + // Check if bucket exists. if os.IsNotExist(e) { return BucketMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket}) } @@ -215,13 +225,15 @@ func (fs Filesystem) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Er return *bucketMetadata, nil } -// SetBucketMetadata - set bucket metadata +// SetBucketMetadata - set bucket metadata. func (fs Filesystem) SetBucketMetadata(bucket string, metadata map[string]string) *probe.Error { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() + // Input validation. if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) } + // Save the acl. acl := metadata["acl"] if !IsValidBucketACL(acl) { return probe.NewError(InvalidACL{ACL: acl}) @@ -233,7 +245,7 @@ func (fs Filesystem) SetBucketMetadata(bucket string, metadata map[string]string bucketDir := filepath.Join(fs.path, bucket) fi, e := os.Stat(bucketDir) if e != nil { - // check if bucket exists + // Check if bucket exists. if os.IsNotExist(e) { return probe.NewError(BucketNotFound{Bucket: bucket}) } diff --git a/pkg/fs/fs-multipart.go b/pkg/fs/fs-multipart.go index e1a0c3cca..997c7d01f 100644 --- a/pkg/fs/fs-multipart.go +++ b/pkg/fs/fs-multipart.go @@ -42,6 +42,7 @@ import ( "github.com/minio/minio/pkg/disk" ) +// isValidUploadID - is upload id. func (fs Filesystem) isValidUploadID(object, uploadID string) bool { s, ok := fs.multiparts.ActiveSession[object] if !ok { @@ -55,15 +56,17 @@ func (fs Filesystem) isValidUploadID(object, uploadID string) bool { // ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.RLock() + defer fs.rwLock.RUnlock() + + // Input validation. if !IsValidBucketName(bucket) { return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } bucket = fs.denormalizeBucket(bucket) bucketPath := filepath.Join(fs.path, bucket) if _, e := os.Stat(bucketPath); e != nil { - // check bucket exists + // Check bucket exists. if os.IsNotExist(e) { return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket}) } @@ -80,7 +83,7 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa resources.IsTruncated = true return resources, nil } - // uploadIDMarker is ignored if KeyMarker is empty + // UploadIDMarker is ignored if KeyMarker is empty. switch { case resources.KeyMarker != "" && resources.UploadIDMarker == "": if object > resources.KeyMarker { @@ -114,6 +117,7 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa return resources, nil } +// concatenate parts. func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error { for _, part := range parts.Part { partFile, e := os.OpenFile(objectPath+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) @@ -123,15 +127,13 @@ func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath stri } recvMD5 := part.ETag - // complete multipart request header md5sum per part is hex encoded - // trim it and decode if possible. - _, e = hex.DecodeString(strings.Trim(recvMD5, "\"")) - if e != nil { + // Complete multipart request header md5sum per part is hex + // encoded trim it and decode if possible. + if _, e = hex.DecodeString(strings.Trim(recvMD5, "\"")); e != nil { return probe.NewError(InvalidDigest{Md5: recvMD5}) } - _, e = io.Copy(mw, partFile) - if e != nil { + if _, e = io.Copy(mw, partFile); e != nil { return probe.NewError(e) } } @@ -140,20 +142,22 @@ func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath stri // NewMultipartUpload - initiate a new multipart session func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() di, e := disk.GetInfo(fs.path) if e != nil { return "", probe.NewError(e) } - // Remove 5% from total space for cumulative disk space used for journalling, inodes etc. + // Remove 5% from total space for cumulative disk space used for + // journalling, inodes etc. availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 if int64(availableDiskSpace) <= fs.minFreeDisk { return "", probe.NewError(RootPathFull{Path: fs.path}) } + // Input validation. if !IsValidBucketName(bucket) { return "", probe.NewError(BucketNameInvalid{Bucket: bucket}) } @@ -164,7 +168,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E bucket = fs.denormalizeBucket(bucket) bucketPath := filepath.Join(fs.path, bucket) if _, e = os.Stat(bucketPath); e != nil { - // check bucket exists + // Check bucket exists. if os.IsNotExist(e) { return "", probe.NewError(BucketNotFound{Bucket: bucket}) } @@ -183,6 +187,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E } } + // Generate new upload id. id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + object + time.Now().String()) uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] @@ -193,6 +198,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E } defer multiPartfile.Close() + // Initialize multipart session. mpartSession := &MultipartSession{} mpartSession.TotalParts = 0 mpartSession.UploadID = uploadID @@ -211,7 +217,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E return uploadID, nil } -// partNumber is a sortable interface for Part slice +// partNumber is a sortable interface for Part slice. type partNumber []*PartMetadata func (a partNumber) Len() int { return len(a) } @@ -220,33 +226,37 @@ func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumb // CreateObjectPart - create a part in a multipart session func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() di, err := disk.GetInfo(fs.path) if err != nil { return "", probe.NewError(err) } - // Remove 5% from total space for cumulative disk space used for journalling, inodes etc. + // Remove 5% from total space for cumulative disk space used for + // journalling, inodes etc. availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 if int64(availableDiskSpace) <= fs.minFreeDisk { return "", probe.NewError(RootPathFull{Path: fs.path}) } + // Part id cannot be negative. if partID <= 0 { return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero")) } - // check bucket name valid + + // Check bucket name valid. if !IsValidBucketName(bucket) { return "", probe.NewError(BucketNameInvalid{Bucket: bucket}) } - // verify object path legal + // Verify object path legal. if !IsValidObjectName(object) { return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } + // Verify upload is valid for the incoming object. if !fs.isValidUploadID(object, uploadID) { return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) } @@ -255,7 +265,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s var expectedMD5SumBytes []byte expectedMD5SumBytes, err = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) if err != nil { - // pro-actively close the connection + // Pro-actively close the connection return "", probe.NewError(InvalidDigest{Md5: expectedMD5Sum}) } expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) @@ -264,7 +274,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s bucket = fs.denormalizeBucket(bucket) bucketPath := filepath.Join(fs.path, bucket) if _, err = os.Stat(bucketPath); err != nil { - // check bucket exists + // Check bucket exists. if os.IsNotExist(err) { return "", probe.NewError(BucketNotFound{Bucket: bucket}) } @@ -285,7 +295,8 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s return "", probe.NewError(e) } md5sum := hex.EncodeToString(h.Sum(nil)) - // Verify if the written object is equal to what is expected, only if it is requested as such + // Verify if the written object is equal to what is expected, only + // if it is requested as such. if strings.TrimSpace(expectedMD5Sum) != "" { if !isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5sum) { partFile.CloseAndPurge() @@ -340,19 +351,20 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s // CompleteMultipartUpload - complete a multipart upload and persist the data func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() - // check bucket name valid + // Check bucket name is valid. if !IsValidBucketName(bucket) { return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } - // verify object path legal + // Verify object path is legal. if !IsValidObjectName(object) { return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } + // Verify if valid upload for incoming object. if !fs.isValidUploadID(object, uploadID) { return ObjectMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) } @@ -360,7 +372,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da bucket = fs.denormalizeBucket(bucket) bucketPath := filepath.Join(fs.path, bucket) if _, e := os.Stat(bucketPath); e != nil { - // check bucket exists + // Check bucket exists. if os.IsNotExist(e) { return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket}) } @@ -446,19 +458,20 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da // ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() - // check bucket name valid + // Check bucket name is valid. if !IsValidBucketName(bucket) { return ObjectResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } - // verify object path legal + // Verify object path legal. if !IsValidObjectName(object) { return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } + // Verify if upload id is valid for incoming object. if !fs.isValidUploadID(object, resources.UploadID) { return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID}) } @@ -477,7 +490,7 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso bucket = fs.denormalizeBucket(bucket) bucketPath := filepath.Join(fs.path, bucket) if _, e := os.Stat(bucketPath); e != nil { - // check bucket exists + // Check bucket exists. if os.IsNotExist(e) { return ObjectResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket}) } @@ -514,15 +527,15 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso // AbortMultipartUpload - abort an incomplete multipart session func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() - // check bucket name valid + // Check bucket name valid. if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) } - // verify object path legal + // Verify object path legal. if !IsValidObjectName(object) { return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } @@ -534,7 +547,7 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob bucket = fs.denormalizeBucket(bucket) bucketPath := filepath.Join(fs.path, bucket) if _, e := os.Stat(bucketPath); e != nil { - // check bucket exists + // Check bucket exists. if os.IsNotExist(e) { return probe.NewError(BucketNotFound{Bucket: bucket}) } diff --git a/pkg/fs/fs-object.go b/pkg/fs/fs-object.go index 997f5cb0b..c0082826c 100644 --- a/pkg/fs/fs-object.go +++ b/pkg/fs/fs-object.go @@ -40,19 +40,18 @@ import ( // GetObject - GET object func (fs Filesystem) GetObject(w io.Writer, bucket, object string, start, length int64) (int64, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.RLock() + defer fs.rwLock.RUnlock() - // validate bucket + // Input validation. if !IsValidBucketName(bucket) { return 0, probe.NewError(BucketNameInvalid{Bucket: bucket}) } - - // validate object if !IsValidObjectName(object) { return 0, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } + // normalize buckets. bucket = fs.denormalizeBucket(bucket) bucketPath := filepath.Join(fs.path, bucket) if _, e := os.Stat(bucketPath); e != nil { @@ -101,11 +100,12 @@ func (fs Filesystem) GetObject(w io.Writer, bucket, object string, start, length return count, nil } -// GetObjectMetadata - HEAD object +// GetObjectMetadata - get object metadata. func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.RLock() + defer fs.rwLock.RUnlock() + // Input validation. if !IsValidBucketName(bucket) { return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } @@ -114,6 +114,7 @@ func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, * return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: bucket}) } + // normalize buckets. bucket = fs.denormalizeBucket(bucket) bucketPath := filepath.Join(fs.path, bucket) if _, e := os.Stat(bucketPath); e != nil { @@ -133,9 +134,12 @@ func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, * return metadata, nil } +// getMetadata - get object metadata. func getMetadata(rootPath, bucket, object string) (ObjectMetadata, *probe.Error) { - // Do not use filepath.Join() since filepath.Join strips off any object names with '/', use them as is - // in a static manner so that we can send a proper 'ObjectNotFound' reply back upon os.Stat() + // Do not use filepath.Join() since filepath.Join strips off any + // object names with '/', use them as is in a static manner so + // that we can send a proper 'ObjectNotFound' reply back upon + // os.Stat(). var objectPath string // For windows use its special os.PathSeparator == "\\" if runtime.GOOS == "windows" { @@ -187,23 +191,24 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) bool { return false } -// CreateObject - PUT object +// CreateObject - create an object. func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() di, e := disk.GetInfo(fs.path) if e != nil { return ObjectMetadata{}, probe.NewError(e) } - // Remove 5% from total space for cumulative disk space used for journalling, inodes etc. + // Remove 5% from total space for cumulative disk space used for + // journalling, inodes etc. availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 if int64(availableDiskSpace) <= fs.minFreeDisk { return ObjectMetadata{}, probe.NewError(RootPathFull{Path: fs.path}) } - // check bucket name valid + // Check bucket name valid. if !IsValidBucketName(bucket) { return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } @@ -216,24 +221,24 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in } return ObjectMetadata{}, probe.NewError(e) } - // verify object path legal + // Verify object path legal. if !IsValidObjectName(object) { return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // get object path + // Get object path. objectPath := filepath.Join(bucketPath, object) if strings.TrimSpace(expectedMD5Sum) != "" { var expectedMD5SumBytes []byte expectedMD5SumBytes, e = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) if e != nil { - // pro-actively close the connection + // Pro-actively close the connection. return ObjectMetadata{}, probe.NewError(InvalidDigest{Md5: expectedMD5Sum}) } expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } - // write object + // Write object. file, e := atomic.FileCreateWithPrefix(objectPath, "") if e != nil { switch e := e.(type) { @@ -266,7 +271,8 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in } md5Sum := hex.EncodeToString(h.Sum(nil)) - // Verify if the written object is equal to what is expected, only if it is requested as such + // Verify if the written object is equal to what is expected, only + // if it is requested as such. if strings.TrimSpace(expectedMD5Sum) != "" { if !isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum) { file.CloseAndPurge() @@ -306,18 +312,19 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in return newObject, nil } +// deleteObjectPath - delete object path if its empty. func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error { if basePath == deletePath { return nil } - fi, e := os.Stat(deletePath) + pathSt, e := os.Stat(deletePath) if e != nil { if os.IsNotExist(e) { return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object}) } return probe.NewError(e) } - if fi.IsDir() { + if pathSt.IsDir() { empty, e := ioutils.IsDirEmpty(deletePath) if e != nil { return probe.NewError(e) @@ -337,8 +344,8 @@ func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error // DeleteObject - delete and object func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error { - fs.lock.Lock() - defer fs.lock.Unlock() + fs.rwLock.Lock() + defer fs.rwLock.Unlock() // check bucket name valid if !IsValidBucketName(bucket) { @@ -369,10 +376,10 @@ func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error { objectPath = fs.path + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object } err := deleteObjectPath(bucketPath, objectPath, bucket, object) - if os.IsNotExist(err.ToGoError()) { - return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object}) - } if err != nil { + if os.IsNotExist(err.ToGoError()) { + return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object}) + } return err.Trace(bucketPath, objectPath, bucket, object) } return nil diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index ae229e56a..1b685fa8c 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -31,7 +31,7 @@ type Filesystem struct { path string minFreeDisk int64 maxBuckets int - lock *sync.Mutex + rwLock *sync.RWMutex multiparts *Multiparts buckets *Buckets listServiceReqCh chan<- listServiceReq @@ -59,7 +59,7 @@ type Multiparts struct { } // New instantiate a new donut -func New(rootPath string) (Filesystem, *probe.Error) { +func New(rootPath string, minFreeDisk int64, maxBuckets int) (Filesystem, *probe.Error) { setFSBucketsConfigPath(filepath.Join(rootPath, "$buckets.json")) setFSMultipartsConfigPath(filepath.Join(rootPath, "$multiparts-session.json")) @@ -80,8 +80,11 @@ func New(rootPath string) (Filesystem, *probe.Error) { return Filesystem{}, err.Trace() } } - // Initialize content db. - contentdb.Init() + + // Initialize contentdb. + if e := contentdb.Init(); e != nil { + return Filesystem{}, probe.NewError(e) + } var buckets *Buckets buckets, err = loadBucketsMetadata() @@ -98,16 +101,18 @@ func New(rootPath string) (Filesystem, *probe.Error) { return Filesystem{}, err.Trace() } } - fs := Filesystem{lock: new(sync.Mutex)} + fs := Filesystem{ + rwLock: &sync.RWMutex{}, + } fs.path = rootPath fs.multiparts = multiparts fs.buckets = buckets /// Defaults // maximum buckets to be listed from list buckets. - fs.maxBuckets = 1000 + fs.maxBuckets = maxBuckets // minium free disk required for i/o operations to succeed. - fs.minFreeDisk = 10 + fs.minFreeDisk = minFreeDisk // Start list goroutine. if err = fs.listObjectsService(); err != nil { @@ -116,20 +121,3 @@ func New(rootPath string) (Filesystem, *probe.Error) { // Return here. return fs, nil } - -// SetMinFreeDisk - set min free disk -func (fs *Filesystem) SetMinFreeDisk(minFreeDisk int64) { - fs.lock.Lock() - defer fs.lock.Unlock() - fs.minFreeDisk = minFreeDisk -} - -// SetMaxBuckets - set total number of buckets supported, default is 100. -func (fs *Filesystem) SetMaxBuckets(maxBuckets int) { - fs.lock.Lock() - defer fs.lock.Unlock() - if maxBuckets == 0 { - maxBuckets = 100 - } - fs.maxBuckets = maxBuckets -} diff --git a/pkg/fs/fs_test.go b/pkg/fs/fs_test.go index f0bad5415..b113416ea 100644 --- a/pkg/fs/fs_test.go +++ b/pkg/fs/fs_test.go @@ -36,8 +36,7 @@ func (s *MySuite) TestAPISuite(c *C) { path, e := ioutil.TempDir(os.TempDir(), "minio-") c.Check(e, IsNil) storageList = append(storageList, path) - store, err := New(path) - store.SetMinFreeDisk(0) + store, err := New(path, 0, 1000) c.Check(err, IsNil) return store } diff --git a/routers.go b/routers.go index 77ab7544b..665e80a53 100644 --- a/routers.go +++ b/routers.go @@ -143,10 +143,9 @@ func getNewWebAPI(conf cloudServerConfig) *WebAPI { // getNewCloudStorageAPI instantiate a new CloudStorageAPI. func getNewCloudStorageAPI(conf cloudServerConfig) CloudStorageAPI { - fs, err := fs.New(conf.Path) + fs, err := fs.New(conf.Path, conf.MinFreeDisk, conf.MaxBuckets) fatalIf(err.Trace(), "Initializing filesystem failed.", nil) - fs.SetMinFreeDisk(conf.MinFreeDisk) return CloudStorageAPI{ Filesystem: fs, AccessLog: conf.AccessLog, diff --git a/server-main.go b/server-main.go index 835b44dd3..9bd77aaa1 100644 --- a/server-main.go +++ b/server-main.go @@ -58,6 +58,9 @@ EXAMPLES: 4. Start minio server with minimum free disk threshold to 5% $ minio {{.Name}} min-free-disk 5% /home/shared/Pictures + 5. Start minio server with minimum free disk threshold to 15% and support upto 2000 buckets. + $ minio {{.Name}} min-free-disk 15% /home/shared/Documents max-buckets 2000 + `, } @@ -74,6 +77,7 @@ type cloudServerConfig struct { /// FS options Path string // Path to export for cloud storage MinFreeDisk int64 // Minimum free disk space for filesystem + MaxBuckets int // Maximum number of buckets suppported by filesystem. /// TLS service TLS bool // TLS on when certs are specified @@ -284,9 +288,12 @@ func serverMain(c *cli.Context) { } var minFreeDisk int64 + var maxBuckets int minFreeDiskSet := false + maxBucketsSet := false // Default minFreeDisk = 10 + maxBuckets = 1000 args := c.Args() for len(args) >= 2 { @@ -300,6 +307,16 @@ func serverMain(c *cli.Context) { fatalIf(err.Trace(args.First()), "Invalid minium free disk size "+args.First()+" passed.", nil) args = args.Tail() minFreeDiskSet = true + case "max-buckets": + if maxBucketsSet { + fatalIf(probe.NewError(errInvalidArgument), "Maximum buckets should be set only once.", nil) + } + args = args.Tail() + var e error + maxBuckets, e = strconv.Atoi(args.First()) + fatalIf(probe.NewError(e), "Invalid max buckets "+args.First()+" passed.", nil) + args = args.Tail() + maxBucketsSet = true default: cli.ShowCommandHelpAndExit(c, "server", 1) // last argument is exit code } @@ -318,6 +335,7 @@ func serverMain(c *cli.Context) { SecretAccessKey: conf.Credentials.SecretAccessKey, Path: path, MinFreeDisk: minFreeDisk, + MaxBuckets: maxBuckets, TLS: tls, CertFile: certFile, KeyFile: keyFile,