From 545a9e4a820e8432a864398e6fbd2bf46c4fc917 Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Wed, 27 Dec 2017 10:06:16 +0530 Subject: [PATCH] Fix storage class related issues (#5322) - Add storage class metadata validation for request header - Change storage class header values to be consistent with AWS S3 - Refactor internal method to take only the reqd argument --- cmd/api-errors.go | 9 +++++++ cmd/common-main.go | 12 ++++----- cmd/gateway-handlers.go | 8 ++++++ cmd/object-handlers.go | 16 ++++++++++++ cmd/object-handlers_test.go | 14 ++++++++++ cmd/storage-class.go | 19 +++++++++++--- cmd/storage-class_test.go | 30 +++++++++++++++++++--- cmd/xl-v1-multipart.go | 2 +- cmd/xl-v1-object.go | 2 +- cmd/xl-v1.go | 4 +-- docs/erasure/storage-class/README.md | 38 ++++++++++++++-------------- 11 files changed, 117 insertions(+), 37 deletions(-) diff --git a/cmd/api-errors.go b/cmd/api-errors.go index ffb0a73bb..a38682b07 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -164,6 +164,10 @@ const ( ErrOperationTimedOut ErrPartsSizeUnequal ErrInvalidRequest + + // Minio storage class error codes + ErrInvalidStorageClass + // Add new extended error codes here. // Please open a https://github.com/minio/minio/issues before adding // new error codes here. @@ -194,6 +198,11 @@ var errorCodeResponse = map[APIErrorCode]APIError{ Description: "Unknown metadata directive.", HTTPStatusCode: http.StatusBadRequest, }, + ErrInvalidStorageClass: { + Code: "InvalidStorageClass", + Description: "Invalid storage class.", + HTTPStatusCode: http.StatusBadRequest, + }, ErrInvalidRequestBody: { Code: "InvalidArgument", Description: "Body shouldn't be set for this request.", diff --git a/cmd/common-main.go b/cmd/common-main.go index d32fd727f..1faaf6cf8 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -119,27 +119,27 @@ func handleCommonEnvVars() { var err error // Check for environment variables and parse into storageClass struct - if ssc := os.Getenv(standardStorageClass); ssc != "" { + if ssc := os.Getenv(standardStorageClassEnv); ssc != "" { globalStandardStorageClass, err = parseStorageClass(ssc) - fatalIf(err, "Invalid value set in environment variable %s.", standardStorageClass) + fatalIf(err, "Invalid value set in environment variable %s.", standardStorageClassEnv) } - if rrsc := os.Getenv(reducedRedundancyStorageClass); rrsc != "" { + if rrsc := os.Getenv(reducedRedundancyStorageClassEnv); rrsc != "" { globalRRStorageClass, err = parseStorageClass(rrsc) - fatalIf(err, "Invalid value set in environment variable %s.", reducedRedundancyStorageClass) + fatalIf(err, "Invalid value set in environment variable %s.", reducedRedundancyStorageClassEnv) } // Validation is done after parsing both the storage classes. This is needed because we need one // storage class value to deduce the correct value of the other storage class. if globalRRStorageClass.Scheme != "" { err := validateRRSParity(globalRRStorageClass.Parity, globalStandardStorageClass.Parity) - fatalIf(err, "Invalid value set in environment variable %s.", reducedRedundancyStorageClass) + fatalIf(err, "Invalid value set in environment variable %s.", reducedRedundancyStorageClassEnv) globalIsStorageClass = true } if globalStandardStorageClass.Scheme != "" { err := validateSSParity(globalStandardStorageClass.Parity, globalRRStorageClass.Parity) - fatalIf(err, "Invalid value set in environment variable %s.", standardStorageClass) + fatalIf(err, "Invalid value set in environment variable %s.", standardStorageClassEnv) globalIsStorageClass = true } } diff --git a/cmd/gateway-handlers.go b/cmd/gateway-handlers.go index d86ec327d..2e77d4a18 100644 --- a/cmd/gateway-handlers.go +++ b/cmd/gateway-handlers.go @@ -186,6 +186,14 @@ func (api gatewayAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Re bucket = vars["bucket"] object = vars["object"] + // Validate storage class metadata if present + if _, ok := r.Header[amzStorageClassCanonical]; ok { + if !isValidStorageClassMeta(r.Header.Get(amzStorageClassCanonical)) { + writeErrorResponse(w, ErrInvalidStorageClass, r.URL) + return + } + } + // TODO: we should validate the object name here // Get Content-Md5 sent by client and verify if valid diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 2a592fcc8..5b78d7235 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -477,6 +477,14 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req bucket := vars["bucket"] object := vars["object"] + // Validate storage class metadata if present + if _, ok := r.Header[amzStorageClassCanonical]; ok { + if !isValidStorageClassMeta(r.Header.Get(amzStorageClassCanonical)) { + writeErrorResponse(w, ErrInvalidStorageClass, r.URL) + return + } + } + // Get Content-Md5 sent by client and verify if valid md5Bytes, err := checkValidMD5(r.Header.Get("Content-Md5")) if err != nil { @@ -659,6 +667,14 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r return } + // Validate storage class metadata if present + if _, ok := r.Header[amzStorageClassCanonical]; ok { + if !isValidStorageClassMeta(r.Header.Get(amzStorageClassCanonical)) { + writeErrorResponse(w, ErrInvalidStorageClass, r.URL) + return + } + } + if IsSSECustomerRequest(r.Header) { // handle SSE-C requests // SSE-C is not implemented for multipart operations yet writeErrorResponse(w, ErrNotImplemented, r.URL) diff --git a/cmd/object-handlers_test.go b/cmd/object-handlers_test.go index 6732a35a3..9416cbbde 100644 --- a/cmd/object-handlers_test.go +++ b/cmd/object-handlers_test.go @@ -801,6 +801,8 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a copySourceHeader.Set("X-Amz-Copy-Source", "somewhere") invalidMD5Header := http.Header{} invalidMD5Header.Set("Content-Md5", "42") + inalidStorageClassHeader := http.Header{} + inalidStorageClassHeader.Set(amzStorageClass, "INVALID") addCustomHeaders := func(req *http.Request, customHeaders http.Header) { for k, values := range customHeaders { @@ -895,6 +897,18 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a fault: MissingContentLength, expectedRespStatus: http.StatusLengthRequired, }, + // Test case - 7. + // Test Case with invalid header key X-Amz-Storage-Class + { + bucketName: bucketName, + objectName: objectName, + headers: inalidStorageClassHeader, + data: bytesData, + dataLen: len(bytesData), + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + expectedRespStatus: http.StatusBadRequest, + }, } // Iterating over the cases, fetching the object validating the response. for i, testCase := range testCases { diff --git a/cmd/storage-class.go b/cmd/storage-class.go index 34dc5c8f0..dc56ebb75 100644 --- a/cmd/storage-class.go +++ b/cmd/storage-class.go @@ -26,10 +26,16 @@ import ( const ( // metadata entry for storage class amzStorageClass = "x-amz-storage-class" + // Canonical metadata entry for storage class + amzStorageClassCanonical = "X-Amz-Storage-Class" // Reduced redundancy storage class - reducedRedundancyStorageClass = "MINIO_STORAGE_CLASS_RRS" + reducedRedundancyStorageClass = "REDUCED_REDUNDANCY" // Standard storage class - standardStorageClass = "MINIO_STORAGE_CLASS_STANDARD" + standardStorageClass = "STANDARD" + // Reduced redundancy storage class environment variable + reducedRedundancyStorageClassEnv = "MINIO_STORAGE_CLASS_RRS" + // Standard storage class environment variable + standardStorageClassEnv = "MINIO_STORAGE_CLASS_STANDARD" // Supported storage class scheme is EC supportedStorageClassScheme = "EC" // Minimum parity disks @@ -48,6 +54,12 @@ type storageClassConfig struct { RRS string `json:"rrs"` } +// Validate if storage class in metadata +// Only Standard and RRS Storage classes are supported +func isValidStorageClassMeta(sc string) bool { + return sc == reducedRedundancyStorageClass || sc == standardStorageClass +} + // Parses given storageClassEnv and returns a storageClass structure. // Supported Storage Class format is "Scheme:Number of parity disks". // Currently only supported scheme is "EC". @@ -142,8 +154,7 @@ func validateSSParity(ssParity, rrsParity int) (err error) { // -- Default for Reduced Redundancy Storage class is, parity = 2 and data = N-Parity // -- Default for Standard Storage class is, parity = N/2, data = N/2 // If storage class is not present in metadata, default value is data = N/2, parity = N/2 -func getDrivesCount(sc string, disks []StorageAPI) (data, parity int) { - totalDisks := len(disks) +func getRedundancyCount(sc string, totalDisks int) (data, parity int) { parity = totalDisks / 2 switch sc { case reducedRedundancyStorageClass: diff --git a/cmd/storage-class_test.go b/cmd/storage-class_test.go index d31450483..a56a0982f 100644 --- a/cmd/storage-class_test.go +++ b/cmd/storage-class_test.go @@ -152,11 +152,11 @@ func testValidateSSParity(obj ObjectLayer, instanceType string, dirs []string, t } } -func TestGetDrivesCount(t *testing.T) { - ExecObjectLayerTestWithDirs(t, testGetDrivesCount) +func TestRedundancyCount(t *testing.T) { + ExecObjectLayerTestWithDirs(t, testGetRedundancyCount) } -func testGetDrivesCount(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) { +func testGetRedundancyCount(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) { // Reset global storage class flags resetGlobalStorageEnvs() xl := obj.(*xlObjects) @@ -183,7 +183,7 @@ func testGetDrivesCount(obj ObjectLayer, instanceType string, dirs []string, t T if tt.name == 5 { globalStandardStorageClass.Parity = 6 } - data, parity := getDrivesCount(tt.sc, tt.disks) + data, parity := getRedundancyCount(tt.sc, len(tt.disks)) if data != tt.expectedData { t.Errorf("Test %d, Expected data disks %d, got %d", tt.name, tt.expectedData, data) return @@ -353,3 +353,25 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin } } } + +// Test isValidStorageClassMeta method with valid and invalid inputs +func TestIsValidStorageClassMeta(t *testing.T) { + tests := []struct { + name int + sc string + want bool + }{ + {1, "STANDARD", true}, + {2, "REDUCED_REDUNDANCY", true}, + {3, "", false}, + {4, "INVALID", false}, + {5, "123", false}, + {6, "MINIO_STORAGE_CLASS_RRS", false}, + {7, "MINIO_STORAGE_CLASS_STANDARD", false}, + } + for _, tt := range tests { + if got := isValidStorageClassMeta(tt.sc); got != tt.want { + t.Errorf("Test %d, Expected Storage Class to be %t, got %t", tt.name, tt.want, got) + } + } +} diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 2486b0020..b633322e5 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -501,7 +501,7 @@ func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMark // operation(s) on the object. func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (string, error) { - dataBlocks, parityBlocks := getDrivesCount(meta[amzStorageClass], xl.storageDisks) + dataBlocks, parityBlocks := getRedundancyCount(meta[amzStorageClass], len(xl.storageDisks)) xlMeta := newXLMetaV1(object, dataBlocks, parityBlocks) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 48ecaad29..b4b999b1a 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -508,7 +508,7 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m } } // Get parity and data drive count based on storage class metadata - dataDrives, parityDrives := getDrivesCount(metadata[amzStorageClass], xl.storageDisks) + dataDrives, parityDrives := getRedundancyCount(metadata[amzStorageClass], len(xl.storageDisks)) // we now know the number of blocks this object needs for data and parity. // writeQuorum is dataBlocks + 1 diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 0c37e5136..5b7ae1a64 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -246,10 +246,10 @@ func getStorageInfo(disks []StorageAPI) StorageInfo { storageInfo.Backend.OnlineDisks = onlineDisks storageInfo.Backend.OfflineDisks = offlineDisks - _, scParity := getDrivesCount(standardStorageClass, disks) + _, scParity := getRedundancyCount(standardStorageClass, len(disks)) storageInfo.Backend.standardSCParity = scParity - _, rrSCparity := getDrivesCount(reducedRedundancyStorageClass, disks) + _, rrSCparity := getRedundancyCount(reducedRedundancyStorageClass, len(disks)) storageInfo.Backend.rrSCParity = rrSCparity return storageInfo diff --git a/docs/erasure/storage-class/README.md b/docs/erasure/storage-class/README.md index 28369a2c3..52a65e02c 100644 --- a/docs/erasure/storage-class/README.md +++ b/docs/erasure/storage-class/README.md @@ -9,27 +9,27 @@ set before starting Minio server. After the data and parity disks for each stora you can set the storage class of an object via request metadata field `x-amz-storage-class`. Minio server then honors the storage class by saving the object in specific number of data and parity disks. -### Values for standard storage class (SS) +### Values for standard storage class (STANDARD) -Standard storage class implies more parity than RRS class. So, SS parity disks should be +`STANDARD` storage class implies more parity than `REDUCED_REDUNDANCY` class. So, `STANDARD` parity disks should be -- Greater than or equal to 2, if RRS parity is not set. -- Greater than RRS parity, if it is set. +- Greater than or equal to 2, if `REDUCED_REDUNDANCY` parity is not set. +- Greater than `REDUCED_REDUNDANCY` parity, if it is set. -As parity blocks can not be higher than data blocks, Standard storage class can not be higher than N/2. (N being total number of disks) +Parity blocks can not be higher than data blocks, so `STANDARD` storage class parity can not be higher than N/2. (N being total number of disks) -Default value for standard storage class is `N/2` (N is the total number of drives). +Default value for `STANDARD` storage class is `N/2` (N is the total number of drives). -### Reduced redundancy storage class (RRS) +### Reduced redundancy storage class (REDUCED_REDUNDANCY) -Reduced redundancy implies lesser parity than SS class. So, RRS parity disks should be +`REDUCED_REDUNDANCY` implies lesser parity than `STANDARD` class. So,`REDUCED_REDUNDANCY` parity disks should be -- Less than N/2, if SS parity is not set. -- Less than SS Parity, if it is set. +- Less than N/2, if `STANDARD` parity is not set. +- Less than `STANDARD` Parity, if it is set. -As parity below 2 is not recommended, RR storage class is not supported for 4 disks erasure coding setup. +As parity below 2 is not recommended, `REDUCED_REDUNDANCY` storage class is not supported for 4 disks erasure coding setup. -Default value for reduced redundancy storage class is `2`. +Default value for `REDUCED_REDUNDANCY` storage class is `2`. ## Get started with Storage Class @@ -37,25 +37,25 @@ Default value for reduced redundancy storage class is `2`. The format to set storage class environment variables is as follows -`MINIO_STORAGE_CLASS_RRS=EC:parity` `MINIO_STORAGE_CLASS_STANDARD=EC:parity` +`MINIO_STORAGE_CLASS_RRS=EC:parity` -For example, set RRS parity 2 and SS parity 3, in 8 disk erasure code setup. +For example, set `MINIO_STORAGE_CLASS_RRS` parity 2 and `MINIO_STORAGE_CLASS_STANDARD` parity 3 ```sh -export MINIO_STORAGE_CLASS_RRS=EC:2 export MINIO_STORAGE_CLASS_STANDARD=EC:3 +export MINIO_STORAGE_CLASS_RRS=EC:2 ``` If storage class is not defined before starting Minio server, and subsequent PutObject metadata field has `x-amz-storage-class` present -with values `MINIO_STORAGE_CLASS_RRS` or `MINIO_STORAGE_CLASS_STANDARD`, Minio server uses default parity values. +with values `REDUCED_REDUNDANCY` or `STANDARD`, Minio server uses default parity values. ### Set metadata -In below example `minio-go` is used to set the storage class to `MINIO_STORAGE_CLASS_RRS`. This means this object will be split across 6 data disks and 2 parity disks (as per the storage class set in previous step). +In below example `minio-go` is used to set the storage class to `REDUCED_REDUNDANCY`. This means this object will be split across 6 data disks and 2 parity disks (as per the storage class set in previous step). ```go -s3Client, err := minio.New("s3.amazonaws.com", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) +s3Client, err := minio.New("localhost:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) if err != nil { log.Fatalln(err) } @@ -70,7 +70,7 @@ if err != nil { log.Fatalln(err) } -n, err := s3Client.PutObject("my-bucketname", "my-objectname", object, objectStat.Size(), minio.PutObjectOptions{ContentType: "application/octet-stream", StorageClass: "MINIO_STORAGE_CLASS_RRS"}) +n, err := s3Client.PutObject("my-bucketname", "my-objectname", object, objectStat.Size(), minio.PutObjectOptions{ContentType: "application/octet-stream", StorageClass: "REDUCED_REDUNDANCY"}) if err != nil { log.Fatalln(err) }