From 3ebe61abdf5b18d7b4b4df75d97e3acd2c83b447 Mon Sep 17 00:00:00 2001 From: Bala FA Date: Wed, 28 Mar 2018 05:14:45 +0530 Subject: [PATCH] Quick support to server level WORM (#5602) This is a trival fix to support server level WORM. The feature comes with an environment variable `MINIO_WORM`. Usage: ``` $ export MINIO_WORM=on $ minio server endpoint ``` --- cmd/api-errors.go | 2 ++ cmd/bucket-handlers.go | 8 +++++ cmd/common-main.go | 3 ++ cmd/fs-v1-multipart.go | 9 +++++- cmd/fs-v1.go | 6 ++++ cmd/globals.go | 2 ++ cmd/object-api-errors.go | 9 +++++- cmd/object-handlers.go | 64 ++++++++++++++++++++++++++++++++++++++++ cmd/server-main.go | 3 ++ cmd/xl-v1-multipart.go | 9 +++++- cmd/xl-v1-object.go | 9 +++++- 11 files changed, 120 insertions(+), 4 deletions(-) diff --git a/cmd/api-errors.go b/cmd/api-errors.go index a8ff1953c..c8439e4ed 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -911,6 +911,8 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) { apiErr = ErrBucketAlreadyOwnedByYou case ObjectNotFound: apiErr = ErrNoSuchKey + case ObjectAlreadyExists: + apiErr = ErrMethodNotAllowed case ObjectNameInvalid: apiErr = ErrInvalidObjectName case InvalidUploadID: diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 5354f622f..90c74d8ad 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -300,6 +300,14 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, return } + // Deny if WORM is enabled + if globalWORMEnabled { + // Not required to check whether given objects exist or not, because + // DeleteMultipleObject is always successful irrespective of object existence. + writeErrorResponse(w, ErrMethodNotAllowed, r.URL) + return + } + var wg = &sync.WaitGroup{} // Allocate a new wait group. var dErrs = make([]error, len(deleteObjects.Objects)) diff --git a/cmd/common-main.go b/cmd/common-main.go index 4668fb58f..0fa220f99 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -158,4 +158,7 @@ func handleCommonEnvVars() { globalIsStorageClass = true } } + + // Get WORM environment variable. + globalWORMEnabled = strings.EqualFold(os.Getenv("MINIO_WORM"), "on") } diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index d45dcb4ee..5fd8a1d03 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. + * Minio Cloud Storage, (C) 2016, 2017, 2018 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -626,6 +626,13 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, return oi, toObjectErr(errors.Trace(err), bucket, object) } + // Deny if WORM is enabled + if globalWORMEnabled { + if _, err = fsStatFile(pathJoin(fs.fsPath, bucket, object)); err == nil { + return ObjectInfo{}, errors.Trace(ObjectAlreadyExists{Bucket: bucket, Object: object}) + } + } + err = fsRenameFile(appendFilePath, pathJoin(fs.fsPath, bucket, object)) if err != nil { return oi, toObjectErr(errors.Trace(err), bucket, object) diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 772fa4020..8031d14ad 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -745,6 +745,12 @@ func (fs *FSObjects) putObject(bucket string, object string, data *hash.Reader, // Entire object was written to the temp location, now it's safe to rename it to the actual location. fsNSObjPath := pathJoin(fs.fsPath, bucket, object) + // Deny if WORM is enabled + if globalWORMEnabled { + if _, err = fsStatFile(fsNSObjPath); err == nil { + return ObjectInfo{}, errors.Trace(ObjectAlreadyExists{Bucket: bucket, Object: object}) + } + } if err = fsRenameFile(fsTmpObjPath, fsNSObjPath); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } diff --git a/cmd/globals.go b/cmd/globals.go index 5cb8c90ed..8a4778e0a 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -172,6 +172,8 @@ var ( // Set to store standard storage class globalStandardStorageClass storageClass + globalWORMEnabled bool + // Add new variable global values here. ) diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index 65fa767ba..7b44c2dc7 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2015 Minio, Inc. + * Minio Cloud Storage, (C) 2015, 2016, 2017, 2018 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -172,6 +172,13 @@ func (e ObjectNotFound) Error() string { return "Object not found: " + e.Bucket + "#" + e.Object } +// ObjectAlreadyExists object already exists. +type ObjectAlreadyExists GenericError + +func (e ObjectAlreadyExists) Error() string { + return "Object: " + e.Bucket + "#" + e.Object + " already exists" +} + // ObjectExistsAsDirectory object already exists as a directory. type ObjectExistsAsDirectory GenericError diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 8b05d1716..01504eeba 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -360,6 +360,14 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re return } + // Deny if WORM is enabled + if globalWORMEnabled { + if _, err = objectAPI.GetObjectInfo(ctx, dstBucket, dstObject); err == nil { + writeErrorResponse(w, ErrMethodNotAllowed, r.URL) + return + } + } + if objectAPI.IsEncryptionSupported() { if apiErr, _ := DecryptCopyObjectInfo(&srcInfo, r.Header); apiErr != ErrNone { writeErrorResponse(w, apiErr, r.URL) @@ -681,6 +689,14 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } + // Deny if WORM is enabled + if globalWORMEnabled { + if _, err = objectAPI.GetObjectInfo(ctx, bucket, object); err == nil { + writeErrorResponse(w, ErrMethodNotAllowed, r.URL) + return + } + } + if objectAPI.IsEncryptionSupported() { if hasSSECustomerHeader(r.Header) && !hasSuffix(object, slashSeparator) { // handle SSE-C requests reader, err = EncryptRequest(hashReader, r, metadata) @@ -753,6 +769,14 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r return } + // Deny if WORM is enabled + if globalWORMEnabled { + if _, err := objectAPI.GetObjectInfo(ctx, bucket, object); err == nil { + writeErrorResponse(w, ErrMethodNotAllowed, r.URL) + return + } + } + // Validate storage class metadata if present if _, ok := r.Header[amzStorageClassCanonical]; ok { if !isValidStorageClassMeta(r.Header.Get(amzStorageClassCanonical)) { @@ -863,6 +887,14 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return } + // Deny if WORM is enabled + if globalWORMEnabled { + if _, err = objectAPI.GetObjectInfo(ctx, dstBucket, dstObject); err == nil { + writeErrorResponse(w, ErrMethodNotAllowed, r.URL) + return + } + } + if objectAPI.IsEncryptionSupported() { if apiErr, _ := DecryptCopyObjectInfo(&srcInfo, r.Header); apiErr != ErrNone { writeErrorResponse(w, apiErr, r.URL) @@ -1119,6 +1151,14 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } + // Deny if WORM is enabled + if globalWORMEnabled { + if _, err = objectAPI.GetObjectInfo(ctx, bucket, object); err == nil { + writeErrorResponse(w, ErrMethodNotAllowed, r.URL) + return + } + } + if objectAPI.IsEncryptionSupported() { var li ListPartsInfo li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1) @@ -1200,6 +1240,14 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, return } + // Deny if WORM is enabled + if globalWORMEnabled { + if _, err := objectAPI.GetObjectInfo(ctx, bucket, object); err == nil { + writeErrorResponse(w, ErrMethodNotAllowed, r.URL) + return + } + } + uploadID, _, _, _ := getObjectResources(r.URL.Query()) if err := objectAPI.AbortMultipartUpload(ctx, bucket, object, uploadID); err != nil { errorIf(err, "AbortMultipartUpload failed") @@ -1268,6 +1316,14 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite return } + // Deny if WORM is enabled + if globalWORMEnabled { + if _, err := objectAPI.GetObjectInfo(ctx, bucket, object); err == nil { + writeErrorResponse(w, ErrMethodNotAllowed, r.URL) + return + } + } + // Get upload id. uploadID, _, _, _ := getObjectResources(r.URL.Query()) @@ -1366,6 +1422,14 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. return } + // Deny if WORM is enabled + if globalWORMEnabled { + // Not required to check whether given object exists or not, because + // DeleteObject is always successful irrespective of object existence. + writeErrorResponse(w, ErrMethodNotAllowed, r.URL) + return + } + // http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html // Ignore delete object errors while replying to client, since we are // suppposed to reply only 204. Additionally log the error for diff --git a/cmd/server-main.go b/cmd/server-main.go index aa6cd3da5..1c5657843 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -76,6 +76,9 @@ ENVIRONMENT VARIABLES: DOMAIN: MINIO_DOMAIN: To enable virtual-host-style requests. Set this value to Minio host domain name. + WORM: + MINIO_WORM: To turn on Write-Once-Read-Many in server, set this value to "on". + EXAMPLES: 1. Start minio server on "/home/shared" directory. $ {{.HelpName}} /home/shared diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 9ceddde27..d181045ac 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. + * Minio Cloud Storage, (C) 2016, 2017, 2018 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -728,6 +728,13 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } } + // Deny if WORM is enabled + if globalWORMEnabled { + if xl.isObject(bucket, object) { + return ObjectInfo{}, errors.Trace(ObjectAlreadyExists{Bucket: bucket, Object: object}) + } + } + // Rename the multipart object to final location. if _, err = renameObject(onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, writeQuorum); err != nil { return oi, toObjectErr(err, bucket, object) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 7e1bd7c01..782291273 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. + * Minio Cloud Storage, (C) 2016, 2017, 2018 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -693,6 +693,13 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m return ObjectInfo{}, toObjectErr(err, bucket, object) } + // Deny if WORM is enabled + if globalWORMEnabled { + if xl.isObject(bucket, object) { + return ObjectInfo{}, errors.Trace(ObjectAlreadyExists{Bucket: bucket, Object: object}) + } + } + // Rename the successfully written temporary object to final location. if _, err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object)