From 3b8adf7528abc2b3c139620cef7ea3169bdfee6e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 6 Oct 2019 22:50:24 -0700 Subject: [PATCH] Move storageclass config handling into cmd/config/storageclass (#8360) Continuation of the changes done in PR #8351 to refactor, add tests and move global handling into a more idiomatic style for Go as packages. --- cmd/common-main.go | 31 -- cmd/config-current.go | 159 +++----- cmd/config-current_test.go | 19 +- cmd/config-versions.go | 23 +- cmd/config/storageclass/storage-class.go | 230 ++++++++++++ cmd/config/storageclass/storage-class_test.go | 163 +++++++++ cmd/disk-cache-backend.go | 3 +- cmd/fs-v1-metadata.go | 3 +- cmd/gateway-main.go | 5 +- cmd/globals.go | 11 - cmd/handler-utils.go | 8 +- cmd/http/headers.go | 4 + cmd/object-api-utils.go | 5 +- cmd/object-handlers.go | 9 +- cmd/object-handlers_test.go | 3 +- cmd/storage-class.go | 213 ----------- cmd/storage-class_test.go | 346 ------------------ cmd/test-utils_test.go | 8 - cmd/xl-sets.go | 20 +- cmd/xl-v1-metadata.go | 22 +- cmd/xl-v1-multipart.go | 10 +- cmd/xl-v1-object.go | 14 +- cmd/xl-v1-object_test.go | 159 +++++++- cmd/xl-v1.go | 6 - pkg/event/target/amqp.go | 27 +- pkg/event/target/redis.go | 33 +- pkg/iam/openid/jwt.go | 75 ++-- pkg/iam/policy/opa.go | 35 +- 28 files changed, 806 insertions(+), 838 deletions(-) create mode 100644 cmd/config/storageclass/storage-class.go create mode 100644 cmd/config/storageclass/storage-class_test.go delete mode 100644 cmd/storage-class.go delete mode 100644 cmd/storage-class_test.go diff --git a/cmd/common-main.go b/cmd/common-main.go index a405c44fe..3a97be0e9 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -20,7 +20,6 @@ import ( "crypto/tls" "errors" "net" - "os" "path/filepath" "strings" "time" @@ -318,36 +317,6 @@ func handleCommonEnvVars() { // in-place update is off. globalInplaceUpdateDisabled = strings.EqualFold(env.Get("MINIO_UPDATE", "off"), "off") - // Validate and store the storage class env variables only for XL/Dist XL setups - if globalIsXL { - var err error - - // Check for environment variables and parse into storageClass struct - if ssc := os.Getenv(standardStorageClassEnv); ssc != "" { - globalStandardStorageClass, err = parseStorageClass(ssc) - logger.FatalIf(err, "Invalid value set in environment variable %s", standardStorageClassEnv) - } - - if rrsc := os.Getenv(reducedRedundancyStorageClassEnv); rrsc != "" { - globalRRStorageClass, err = parseStorageClass(rrsc) - logger.FatalIf(err, "Invalid value set in environment variable %s", reducedRedundancyStorageClassEnv) - } - - // Validation is done after parsing both the storage classes. This is needed because we need one - // storage class value to deduce the correct value of the other storage class. - if globalRRStorageClass.Scheme != "" { - err = validateParity(globalStandardStorageClass.Parity, globalRRStorageClass.Parity) - logger.FatalIf(err, "Invalid value set in environment variable %s", reducedRedundancyStorageClassEnv) - globalIsStorageClass = true - } - - if globalStandardStorageClass.Scheme != "" { - err = validateParity(globalStandardStorageClass.Parity, globalRRStorageClass.Parity) - logger.FatalIf(err, "Invalid value set in environment variable %s", standardStorageClassEnv) - globalIsStorageClass = true - } - } - // Get WORM environment variable. if worm := env.Get("MINIO_WORM", "off"); worm != "" { wormFlag, err := ParseBoolFlag(worm) diff --git a/cmd/config-current.go b/cmd/config-current.go index b09be2d4f..e075d6e45 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -27,6 +27,7 @@ import ( "github.com/minio/minio/cmd/config/cache" "github.com/minio/minio/cmd/config/compress" xldap "github.com/minio/minio/cmd/config/ldap" + "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -36,7 +37,6 @@ import ( "github.com/minio/minio/pkg/event/target" "github.com/minio/minio/pkg/iam/openid" iampolicy "github.com/minio/minio/pkg/iam/policy" - xnet "github.com/minio/minio/pkg/net" ) // Steps to move from version N to version N+1 @@ -82,6 +82,10 @@ func (s *serverConfig) GetRegion() string { // SetCredential sets new credential and returns the previous credential. func (s *serverConfig) SetCredential(creds auth.Credentials) (prevCred auth.Credentials) { + if s == nil { + return creds + } + if creds.IsValid() && globalActiveCred.IsValid() { globalActiveCred = creds } @@ -110,21 +114,13 @@ func (s *serverConfig) SetWorm(b bool) { s.Worm = BoolFlag(b) } -func (s *serverConfig) SetStorageClass(standardClass, rrsClass storageClass) { - s.StorageClass.Standard = standardClass - s.StorageClass.RRS = rrsClass -} - // GetStorageClass reads storage class fields from current config. // It returns the standard and reduced redundancy storage class struct -func (s *serverConfig) GetStorageClass() (storageClass, storageClass) { - if globalIsStorageClass { - return globalStandardStorageClass, globalRRStorageClass - } +func (s *serverConfig) GetStorageClass() storageclass.Config { if s == nil { - return storageClass{}, storageClass{} + return storageclass.Config{} } - return s.StorageClass.Standard, s.StorageClass.RRS + return s.StorageClass } // GetWorm get current credentials. @@ -234,37 +230,34 @@ func (s *serverConfig) Validate() error { return nil } -// SetCompressionConfig sets the current compression config -func (s *serverConfig) SetCompressionConfig(extensions []string, mimeTypes []string) { - s.Compression.Extensions = extensions - s.Compression.MimeTypes = mimeTypes - s.Compression.Enabled = globalIsCompressionEnabled -} - -// GetCompressionConfig gets the current compression config -func (s *serverConfig) GetCompressionConfig() compress.Config { - return s.Compression -} - -func (s *serverConfig) loadFromEnvs() { +func (s *serverConfig) lookupConfigs() { // If env is set override the credentials from config file. if globalIsEnvCreds { s.SetCredential(globalActiveCred) + } else { + globalActiveCred = s.GetCredential() } if globalIsEnvWORM { s.SetWorm(globalWORMEnabled) + } else { + globalWORMEnabled = s.GetWorm() } if globalIsEnvRegion { s.SetRegion(globalServerRegion) + } else { + globalServerRegion = s.GetRegion() } - if globalIsStorageClass { - s.SetStorageClass(globalStandardStorageClass, globalRRStorageClass) + var err error + if globalIsXL { + s.StorageClass, err = storageclass.LookupConfig(s.StorageClass, globalXLSetDriveCount) + if err != nil { + logger.FatalIf(err, "Unable to initialize storage class config") + } } - var err error s.Cache, err = cache.LookupConfig(s.Cache) if err != nil { logger.FatalIf(err, "Unable to setup cache") @@ -277,7 +270,6 @@ func (s *serverConfig) loadFromEnvs() { globalCacheExpiry = s.Cache.Expiry globalCacheMaxUse = s.Cache.MaxUse - var err error if cacheEncKey := env.Get(cache.EnvCacheEncryptionMasterKey, ""); cacheEncKey != "" { globalCacheKMSKeyID, globalCacheKMS, err = parseKMSMasterKey(cacheEncKey) if err != nil { @@ -296,30 +288,25 @@ func (s *serverConfig) loadFromEnvs() { logger.FatalIf(err, "Unable to setup Compression") } - if jwksURL, ok := env.Lookup("MINIO_IAM_JWKS_URL"); ok { - u, err := xnet.ParseURL(jwksURL) - if err != nil { - logger.FatalIf(err, "Unable to parse MINIO_IAM_JWKS_URL %s", jwksURL) - } - s.OpenID.JWKS.URL = u + if s.Compression.Enabled { + globalIsCompressionEnabled = s.Compression.Enabled + globalCompressExtensions = s.Compression.Extensions + globalCompressMimeTypes = s.Compression.MimeTypes } - if opaURL, ok := env.Lookup("MINIO_IAM_OPA_URL"); ok { - u, err := xnet.ParseURL(opaURL) - if err != nil { - logger.FatalIf(err, "Unable to parse MINIO_IAM_OPA_URL %s", opaURL) - } - opaArgs := iampolicy.OpaArgs{ - URL: u, - AuthToken: env.Get("MINIO_IAM_OPA_AUTHTOKEN", ""), - Transport: NewCustomHTTPTransport(), - CloseRespFn: xhttp.DrainBody, - } - logger.FatalIf(opaArgs.Validate(), "Unable to reach MINIO_IAM_OPA_URL %s", opaURL) - s.Policy.OPA.URL = opaArgs.URL - s.Policy.OPA.AuthToken = opaArgs.AuthToken + s.OpenID.JWKS, err = openid.LookupConfig(s.OpenID.JWKS, NewCustomHTTPTransport(), xhttp.DrainBody) + if err != nil { + logger.FatalIf(err, "Unable to initialize OpenID") + } + + s.Policy.OPA, err = iampolicy.LookupConfig(s.Policy.OPA, NewCustomHTTPTransport(), xhttp.DrainBody) + if err != nil { + logger.FatalIf(err, "Unable to initialize OPA") } + globalOpenIDValidators = getOpenIDValidators(s) + globalPolicyOPA = iampolicy.NewOpa(s.Policy.OPA) + s.LDAPServerConfig, err = xldap.Lookup(s.LDAPServerConfig, globalRootCAs) if err != nil { logger.FatalIf(err, "Unable to parse LDAP configuration from env") @@ -334,7 +321,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewAMQPTarget(k, v, GlobalServiceDoneCh) + t, err := target.NewAMQPTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { return fmt.Errorf("amqp(%s): %s", k, err.Error()) } @@ -426,7 +413,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewRedisTarget(k, v, GlobalServiceDoneCh) + t, err := target.NewRedisTarget(k, v, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { return fmt.Errorf("redis(%s): %s", k, err.Error()) } @@ -495,9 +482,9 @@ func newServerConfig() *serverConfig { Version: serverConfigVersion, Credential: cred, Region: globalMinioDefaultRegion, - StorageClass: storageClassConfig{ - Standard: storageClass{}, - RRS: storageClass{}, + StorageClass: storageclass.Config{ + Standard: storageclass.StorageClass{}, + RRS: storageclass.StorageClass{}, }, Cache: cache.Config{ Drives: []string{}, @@ -550,56 +537,6 @@ func newServerConfig() *serverConfig { return srvCfg } -func (s *serverConfig) loadToCachedConfigs() { - if !globalIsEnvCreds { - globalActiveCred = s.GetCredential() - } - if !globalIsEnvWORM { - globalWORMEnabled = s.GetWorm() - } - if !globalIsEnvRegion { - globalServerRegion = s.GetRegion() - } - if !globalIsStorageClass { - globalStandardStorageClass, globalRRStorageClass = s.GetStorageClass() - } - if !globalIsDiskCacheEnabled { - cacheConf := s.GetCacheConfig() - globalCacheDrives = cacheConf.Drives - globalCacheExcludes = cacheConf.Exclude - globalCacheExpiry = cacheConf.Expiry - globalCacheMaxUse = cacheConf.MaxUse - } - if err := LookupKMSConfig(s.KMS); err != nil { - logger.FatalIf(err, "Unable to setup the KMS %s", s.KMS.Vault.Endpoint) - } - - if !globalIsCompressionEnabled { - compressionConf := s.GetCompressionConfig() - globalCompressExtensions = compressionConf.Extensions - globalCompressMimeTypes = compressionConf.MimeTypes - globalIsCompressionEnabled = compressionConf.Enabled - } - - if s.OpenID.JWKS.URL != nil && s.OpenID.JWKS.URL.String() != "" { - logger.FatalIf(s.OpenID.JWKS.PopulatePublicKey(), - "Unable to populate public key from JWKS URL %s", s.OpenID.JWKS.URL) - } - - globalOpenIDValidators = getOpenIDValidators(s) - - if s.Policy.OPA.URL != nil && s.Policy.OPA.URL.String() != "" { - opaArgs := iampolicy.OpaArgs{ - URL: s.Policy.OPA.URL, - AuthToken: s.Policy.OPA.AuthToken, - Transport: NewCustomHTTPTransport(), - CloseRespFn: xhttp.DrainBody, - } - logger.FatalIf(opaArgs.Validate(), "Unable to reach OPA URL %s", s.Policy.OPA.URL) - globalPolicyOPA = iampolicy.NewOpa(opaArgs) - } -} - // newSrvConfig - initialize a new server config, saves env parameters if // found, otherwise use default parameters func newSrvConfig(objAPI ObjectLayer) error { @@ -607,10 +544,7 @@ func newSrvConfig(objAPI ObjectLayer) error { srvCfg := newServerConfig() // Override any values from ENVs. - srvCfg.loadFromEnvs() - - // Load values to cached global values. - srvCfg.loadToCachedConfigs() + srvCfg.lookupConfigs() // hold the mutex lock before a new config is assigned. globalServerConfigMu.Lock() @@ -640,10 +574,7 @@ func loadConfig(objAPI ObjectLayer) error { } // Override any values from ENVs. - srvCfg.loadFromEnvs() - - // Load values to cached global values. - srvCfg.loadToCachedConfigs() + srvCfg.lookupConfigs() // hold the mutex lock before a new config is assigned. globalServerConfigMu.Lock() @@ -679,7 +610,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { } for id, args := range config.Notify.AMQP { if args.Enable { - newTarget, err := target.NewAMQPTarget(id, args, GlobalServiceDoneCh) + newTarget, err := target.NewAMQPTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { logger.LogIf(context.Background(), err) continue @@ -797,7 +728,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.Redis { if args.Enable { - newTarget, err := target.NewRedisTarget(id, args, GlobalServiceDoneCh) + newTarget, err := target.NewRedisTarget(id, args, GlobalServiceDoneCh, logger.LogOnceIf) if err != nil { logger.LogIf(context.Background(), err) continue diff --git a/cmd/config-current_test.go b/cmd/config-current_test.go index c1d65ae35..705c674b6 100644 --- a/cmd/config-current_test.go +++ b/cmd/config-current_test.go @@ -22,6 +22,7 @@ import ( "path" "testing" + "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/event/target" ) @@ -279,8 +280,22 @@ func TestConfigDiff(t *testing.T) { {&serverConfig{Region: "us-east-1"}, &serverConfig{Region: "us-west-1"}, "Region configuration differs"}, // 4 { - &serverConfig{StorageClass: storageClassConfig{storageClass{"1", 8}, storageClass{"2", 6}}}, - &serverConfig{StorageClass: storageClassConfig{storageClass{"1", 8}, storageClass{"2", 4}}}, + &serverConfig{StorageClass: storageclass.Config{ + Standard: storageclass.StorageClass{ + Parity: 8, + }, + RRS: storageclass.StorageClass{ + Parity: 6, + }, + }}, + &serverConfig{StorageClass: storageclass.Config{ + Standard: storageclass.StorageClass{ + Parity: 8, + }, + RRS: storageclass.StorageClass{ + Parity: 4, + }, + }}, "StorageClass configuration differs", }, // 5 diff --git a/cmd/config-versions.go b/cmd/config-versions.go index d4d6ace7e..12ed81f0e 100644 --- a/cmd/config-versions.go +++ b/cmd/config-versions.go @@ -22,6 +22,7 @@ import ( "github.com/minio/minio/cmd/config/cache" "github.com/minio/minio/cmd/config/compress" xldap "github.com/minio/minio/cmd/config/ldap" + "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/cmd/crypto" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/event/target" @@ -572,7 +573,7 @@ type serverConfigV22 struct { Domain string `json:"domain"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Notification queue configuration. Notify notifierV3 `json:"notify"` @@ -592,7 +593,7 @@ type serverConfigV23 struct { Domain string `json:"domain"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Cache configuration Cache cache.Config `json:"cache"` @@ -616,7 +617,7 @@ type serverConfigV24 struct { Domain string `json:"domain"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Cache configuration Cache cache.Config `json:"cache"` @@ -643,7 +644,7 @@ type serverConfigV25 struct { Domain string `json:"domain"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Cache configuration Cache cache.Config `json:"cache"` @@ -667,7 +668,7 @@ type serverConfigV26 struct { Domain string `json:"domain"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Cache configuration Cache cache.Config `json:"cache"` @@ -708,7 +709,7 @@ type serverConfigV27 struct { Domain string `json:"domain"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Cache configuration Cache cache.Config `json:"cache"` @@ -736,7 +737,7 @@ type serverConfigV28 struct { Worm BoolFlag `json:"worm"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Cache configuration Cache cache.Config `json:"cache"` @@ -765,7 +766,7 @@ type serverConfigV30 struct { Worm BoolFlag `json:"worm"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Cache configuration Cache cache.Config `json:"cache"` @@ -793,7 +794,7 @@ type serverConfigV31 struct { Worm BoolFlag `json:"worm"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Cache configuration Cache cache.Config `json:"cache"` @@ -848,7 +849,7 @@ type serverConfigV32 struct { Worm BoolFlag `json:"worm"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Cache configuration Cache cache.Config `json:"cache"` @@ -892,7 +893,7 @@ type serverConfigV33 struct { Worm BoolFlag `json:"worm"` // Storage class configuration - StorageClass storageClassConfig `json:"storageclass"` + StorageClass storageclass.Config `json:"storageclass"` // Cache configuration Cache cache.Config `json:"cache"` diff --git a/cmd/config/storageclass/storage-class.go b/cmd/config/storageclass/storage-class.go new file mode 100644 index 000000000..472f0583c --- /dev/null +++ b/cmd/config/storageclass/storage-class.go @@ -0,0 +1,230 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package storageclass + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/minio/minio/cmd/config" + "github.com/minio/minio/pkg/env" +) + +// Standard constants for all storage class +const ( + // Reduced redundancy storage class + RRS = "REDUCED_REDUNDANCY" + // Standard storage class + STANDARD = "STANDARD" +) + +// Standard constats for config info storage class +const ( + // Reduced redundancy storage class environment variable + RRSEnv = "MINIO_STORAGE_CLASS_RRS" + // Standard storage class environment variable + StandardEnv = "MINIO_STORAGE_CLASS_STANDARD" + + // Supported storage class scheme is EC + schemePrefix = "EC" + + // Min parity disks + minParityDisks = 2 + + // Default RRS parity is always minimum parity. + defaultRRSParity = minParityDisks +) + +// StorageClass - holds storage class information +type StorageClass struct { + Parity int +} + +// Config storage class configuration +type Config struct { + Standard StorageClass `json:"standard"` + RRS StorageClass `json:"rrs"` +} + +// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON. +func (sCfg *Config) UnmarshalJSON(data []byte) error { + type Alias Config + aux := &struct { + *Alias + }{ + Alias: (*Alias)(sCfg), + } + return json.Unmarshal(data, &aux) +} + +// IsValid - returns true if input string is a valid +// storage class kind supported. +func IsValid(sc string) bool { + return sc == RRS || sc == STANDARD +} + +// UnmarshalText unmarshals storage class from its textual form into +// storageClass structure. +func (sc *StorageClass) UnmarshalText(b []byte) error { + scStr := string(b) + if scStr == "" { + return nil + } + s, err := parseStorageClass(scStr) + if err != nil { + return err + } + sc.Parity = s.Parity + return nil +} + +// MarshalText - marshals storage class string. +func (sc *StorageClass) MarshalText() ([]byte, error) { + if sc.Parity != 0 { + return []byte(fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)), nil + } + return []byte(""), nil +} + +func (sc *StorageClass) String() string { + if sc.Parity != 0 { + return fmt.Sprintf("%s:%d", schemePrefix, sc.Parity) + } + return "" +} + +// Parses given storageClassEnv and returns a storageClass structure. +// Supported Storage Class format is "Scheme:Number of parity disks". +// Currently only supported scheme is "EC". +func parseStorageClass(storageClassEnv string) (sc StorageClass, err error) { + s := strings.Split(storageClassEnv, ":") + + // only two elements allowed in the string - "scheme" and "number of parity disks" + if len(s) > 2 { + return StorageClass{}, config.ErrStorageClassValue(nil).Msg("Too many sections in " + storageClassEnv) + } else if len(s) < 2 { + return StorageClass{}, config.ErrStorageClassValue(nil).Msg("Too few sections in " + storageClassEnv) + } + + // only allowed scheme is "EC" + if s[0] != schemePrefix { + return StorageClass{}, config.ErrStorageClassValue(nil).Msg("Unsupported scheme " + s[0] + ". Supported scheme is EC") + } + + // Number of parity disks should be integer + parityDisks, err := strconv.Atoi(s[1]) + if err != nil { + return StorageClass{}, config.ErrStorageClassValue(err) + } + + return StorageClass{ + Parity: parityDisks, + }, nil +} + +// Validates the parity disks. +func validateParity(ssParity, rrsParity, drivesPerSet int) (err error) { + if ssParity == 0 && rrsParity == 0 { + return nil + } + + // SS parity disks should be greater than or equal to minParityDisks. + // Parity below minParityDisks is not supported. + if ssParity < minParityDisks { + return fmt.Errorf("Standard storage class parity %d should be greater than or equal to %d", + ssParity, minParityDisks) + } + + // RRS parity disks should be greater than or equal to minParityDisks. + // Parity below minParityDisks is not supported. + if rrsParity < minParityDisks { + return fmt.Errorf("Reduced redundancy storage class parity %d should be greater than or equal to %d", rrsParity, minParityDisks) + } + + if ssParity > drivesPerSet/2 { + return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, drivesPerSet/2) + } + + if rrsParity > drivesPerSet/2 { + return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, drivesPerSet/2) + } + + if ssParity > 0 && rrsParity > 0 { + if ssParity < rrsParity { + return fmt.Errorf("Standard storage class parity disks %d should be greater than or equal to Reduced redundancy storage class parity disks %d", ssParity, rrsParity) + } + } + return nil +} + +// GetParityForSC - Returns the data and parity drive count based on storage class +// If storage class is set using the env vars MINIO_STORAGE_CLASS_RRS and MINIO_STORAGE_CLASS_STANDARD +// or config.json fields +// -- corresponding values are returned +// If storage class is not set during startup, default values are returned +// -- Default for Reduced Redundancy Storage class is, parity = 2 and data = N-Parity +// -- Default for Standard Storage class is, parity = N/2, data = N/2 +// If storage class is empty +// -- standard storage class is assumed and corresponding data and parity is returned +func (sCfg Config) GetParityForSC(sc string) (parity int) { + switch strings.TrimSpace(sc) { + case RRS: + // set the rrs parity if available + if sCfg.RRS.Parity == 0 { + return defaultRRSParity + } + return sCfg.RRS.Parity + default: + return sCfg.Standard.Parity + } +} + +// LookupConfig - lookup storage class config and override with valid environment settings if any. +func LookupConfig(cfg Config, drivesPerSet int) (Config, error) { + var err error + + // Check for environment variables and parse into storageClass struct + if ssc := env.Get(StandardEnv, cfg.Standard.String()); ssc != "" { + cfg.Standard, err = parseStorageClass(ssc) + if err != nil { + return cfg, err + } + if cfg.Standard.Parity == 0 { + cfg.Standard.Parity = drivesPerSet / 2 + } + } + + if rrsc := env.Get(RRSEnv, cfg.RRS.String()); rrsc != "" { + cfg.RRS, err = parseStorageClass(rrsc) + if err != nil { + return cfg, err + } + if cfg.RRS.Parity == 0 { + cfg.RRS.Parity = defaultRRSParity + } + } + + // Validation is done after parsing both the storage classes. This is needed because we need one + // storage class value to deduce the correct value of the other storage class. + if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, drivesPerSet); err != nil { + return cfg, err + } + + return cfg, nil +} diff --git a/cmd/config/storageclass/storage-class_test.go b/cmd/config/storageclass/storage-class_test.go new file mode 100644 index 000000000..da47c33ec --- /dev/null +++ b/cmd/config/storageclass/storage-class_test.go @@ -0,0 +1,163 @@ +/* + * MinIO Cloud Storage, (C) 2017-2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package storageclass + +import ( + "errors" + "reflect" + "testing" +) + +func TestParseStorageClass(t *testing.T) { + tests := []struct { + storageClassEnv string + wantSc StorageClass + expectedError error + }{ + {"EC:3", StorageClass{ + Parity: 3}, + nil}, + {"EC:4", StorageClass{ + Parity: 4}, + nil}, + {"AB:4", StorageClass{ + Parity: 4}, + errors.New("Unsupported scheme AB. Supported scheme is EC")}, + {"EC:4:5", StorageClass{ + Parity: 4}, + errors.New("Too many sections in EC:4:5")}, + {"EC:A", StorageClass{ + Parity: 4}, + errors.New(`strconv.Atoi: parsing "A": invalid syntax`)}, + {"AB", StorageClass{ + Parity: 4}, + errors.New("Too few sections in AB")}, + } + for i, tt := range tests { + gotSc, err := parseStorageClass(tt.storageClassEnv) + if err != nil && tt.expectedError == nil { + t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err) + return + } + if err == nil && tt.expectedError != nil { + t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err) + return + } + if tt.expectedError == nil && !reflect.DeepEqual(gotSc, tt.wantSc) { + t.Errorf("Test %d, Expected %v, got %v", i+1, tt.wantSc, gotSc) + return + } + if tt.expectedError != nil && err.Error() != tt.expectedError.Error() { + t.Errorf("Test %d, Expected `%v`, got `%v`", i+1, tt.expectedError, err) + } + } +} + +func TestValidateParity(t *testing.T) { + tests := []struct { + rrsParity int + ssParity int + success bool + drivesPerSet int + }{ + {2, 4, true, 16}, + {3, 3, true, 16}, + {0, 0, true, 16}, + {1, 4, false, 16}, + {7, 6, false, 16}, + {9, 0, false, 16}, + {9, 9, false, 16}, + {2, 9, false, 16}, + {9, 2, false, 16}, + } + for i, tt := range tests { + err := validateParity(tt.ssParity, tt.rrsParity, tt.drivesPerSet) + if err != nil && tt.success { + t.Errorf("Test %d, Expected success, got %s", i+1, err) + } + if err == nil && !tt.success { + t.Errorf("Test %d, Expected failure, got success", i+1) + } + } +} + +func TestParityCount(t *testing.T) { + tests := []struct { + sc string + disksCount int + expectedData int + expectedParity int + }{ + {RRS, 16, 14, 2}, + {STANDARD, 16, 8, 8}, + {"", 16, 8, 8}, + {RRS, 16, 9, 7}, + {STANDARD, 16, 10, 6}, + {"", 16, 9, 7}, + } + for i, tt := range tests { + scfg := Config{ + Standard: StorageClass{ + Parity: 8, + }, + RRS: StorageClass{ + Parity: 0, + }, + } + // Set env var for test case 4 + if i+1 == 4 { + scfg.RRS.Parity = 7 + } + // Set env var for test case 5 + if i+1 == 5 { + scfg.Standard.Parity = 6 + } + // Set env var for test case 6 + if i+1 == 6 { + scfg.Standard.Parity = 7 + } + parity := scfg.GetParityForSC(tt.sc) + if (tt.disksCount - parity) != tt.expectedData { + t.Errorf("Test %d, Expected data disks %d, got %d", i+1, tt.expectedData, tt.disksCount-parity) + continue + } + if parity != tt.expectedParity { + t.Errorf("Test %d, Expected parity disks %d, got %d", i+1, tt.expectedParity, parity) + } + } +} + +// Test IsValid method with valid and invalid inputs +func TestIsValidStorageClassKind(t *testing.T) { + tests := []struct { + sc string + want bool + }{ + {"STANDARD", true}, + {"REDUCED_REDUNDANCY", true}, + {"", false}, + {"INVALID", false}, + {"123", false}, + {"MINIO_STORAGE_CLASS_RRS", false}, + {"MINIO_STORAGE_CLASS_STANDARD", false}, + } + for i, tt := range tests { + if got := IsValid(tt.sc); got != tt.want { + t.Errorf("Test %d, Expected Storage Class to be %t, got %t", i+1, tt.want, got) + } + } +} diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 736ed18ff..f2c5c798b 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -35,6 +35,7 @@ import ( "github.com/djherbis/atime" "github.com/minio/minio/cmd/crypto" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/disk" "github.com/minio/sio" @@ -86,7 +87,7 @@ func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) { o.ETag = extractETag(m.Meta) o.ContentType = m.Meta["content-type"] o.ContentEncoding = m.Meta["content-encoding"] - if storageClass, ok := m.Meta[amzStorageClass]; ok { + if storageClass, ok := m.Meta[xhttp.AmzStorageClass]; ok { o.StorageClass = storageClass } else { o.StorageClass = globalMinioDefaultStorageClass diff --git a/cmd/fs-v1-metadata.go b/cmd/fs-v1-metadata.go index 76dbebd93..f684e581e 100644 --- a/cmd/fs-v1-metadata.go +++ b/cmd/fs-v1-metadata.go @@ -28,6 +28,7 @@ import ( "time" jsoniter "github.com/json-iterator/go" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/lock" "github.com/minio/minio/pkg/mimedb" @@ -166,7 +167,7 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo objInfo.ETag = extractETag(m.Meta) objInfo.ContentType = m.Meta["content-type"] objInfo.ContentEncoding = m.Meta["content-encoding"] - if storageClass, ok := m.Meta[amzStorageClass]; ok { + if storageClass, ok := m.Meta[xhttp.AmzStorageClass]; ok { objInfo.StorageClass = storageClass } else { objInfo.StorageClass = globalMinioDefaultStorageClass diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 71f518307..411da0937 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -212,10 +212,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { srvCfg := newServerConfig() // Override any values from ENVs. - srvCfg.loadFromEnvs() - - // Load values to cached global values. - srvCfg.loadToCachedConfigs() + srvCfg.lookupConfigs() // hold the mutex lock before a new config is assigned. globalServerConfigMu.Lock() diff --git a/cmd/globals.go b/cmd/globals.go index 915051c32..bd036b656 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -198,14 +198,6 @@ var ( globalOperationTimeout = newDynamicTimeout(10*time.Minute /*30*/, 600*time.Second) // default timeout for general ops globalHealingTimeout = newDynamicTimeout(30*time.Minute /*1*/, 30*time.Minute) // timeout for healing related ops - // Storage classes - // Set to indicate if storage class is set up - globalIsStorageClass bool - // Set to store reduced redundancy storage class - globalRRStorageClass storageClass - // Set to store standard storage class - globalStandardStorageClass storageClass - globalIsEnvWORM bool // Is worm enabled globalWORMEnabled bool @@ -249,9 +241,6 @@ var ( // configuration must be present. globalAutoEncryption bool - // Is compression include extensions/content-types set? - globalIsEnvCompression bool - // Is compression enabled? globalIsCompressionEnabled = false diff --git a/cmd/handler-utils.go b/cmd/handler-utils.go index 068fb54ef..adc721995 100644 --- a/cmd/handler-utils.go +++ b/cmd/handler-utils.go @@ -65,14 +65,14 @@ var supportedHeaders = []string{ "content-language", "content-encoding", "content-disposition", - amzStorageClass, + xhttp.AmzStorageClass, "expires", // Add more supported headers here. } // isMetadataDirectiveValid - check if metadata-directive is valid. func isMetadataDirectiveValid(h http.Header) bool { - _, ok := h[http.CanonicalHeaderKey("X-Amz-Metadata-Directive")] + _, ok := h[http.CanonicalHeaderKey(xhttp.AmzMetadataDirective)] if ok { // Check atleast set metadata-directive is valid. return (isMetadataCopy(h) || isMetadataReplace(h)) @@ -84,12 +84,12 @@ func isMetadataDirectiveValid(h http.Header) bool { // Check if the metadata COPY is requested. func isMetadataCopy(h http.Header) bool { - return h.Get("X-Amz-Metadata-Directive") == "COPY" + return h.Get(xhttp.AmzMetadataDirective) == "COPY" } // Check if the metadata REPLACE is requested. func isMetadataReplace(h http.Header) bool { - return h.Get("X-Amz-Metadata-Directive") == "REPLACE" + return h.Get(xhttp.AmzMetadataDirective) == "REPLACE" } // Splits an incoming path into bucket and object components. diff --git a/cmd/http/headers.go b/cmd/http/headers.go index ae456947f..793f580ff 100644 --- a/cmd/http/headers.go +++ b/cmd/http/headers.go @@ -47,6 +47,9 @@ const ( IfMatch = "If-Match" IfNoneMatch = "If-None-Match" + // S3 storage class + AmzStorageClass = "x-amz-storage-class" + // S3 extensions AmzCopySourceIfModifiedSince = "x-amz-copy-source-if-modified-since" AmzCopySourceIfUnmodifiedSince = "x-amz-copy-source-if-unmodified-since" @@ -57,6 +60,7 @@ const ( AmzCopySource = "X-Amz-Copy-Source" AmzCopySourceVersionID = "X-Amz-Copy-Source-Version-Id" AmzCopySourceRange = "X-Amz-Copy-Source-Range" + AmzMetadataDirective = "X-Amz-Metadata-Directive" // Signature V4 related contants. AmzContentSha256 = "X-Amz-Content-Sha256" diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 2e88cfea6..0a1bfd80e 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -36,6 +36,7 @@ import ( "github.com/klauspost/compress/s2" "github.com/klauspost/readahead" "github.com/minio/minio-go/v6/pkg/s3utils" + "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -239,8 +240,8 @@ func cleanMetadata(metadata map[string]string) map[string]string { // Filter X-Amz-Storage-Class field only if it is set to STANDARD. // This is done since AWS S3 doesn't return STANDARD Storage class as response header. func removeStandardStorageClass(metadata map[string]string) map[string]string { - if metadata[amzStorageClass] == standardStorageClass { - delete(metadata, amzStorageClass) + if metadata[xhttp.AmzStorageClass] == storageclass.STANDARD { + delete(metadata, xhttp.AmzStorageClass) } return metadata } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index d5b781e7b..5a5c4a7d9 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -35,6 +35,7 @@ import ( "github.com/gorilla/mux" miniogo "github.com/minio/minio-go/v6" "github.com/minio/minio-go/v6/pkg/encrypt" + "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -1063,8 +1064,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } // Validate storage class metadata if present - if _, ok := r.Header[amzStorageClassCanonical]; ok { - if !isValidStorageClassMeta(r.Header.Get(amzStorageClassCanonical)) { + if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { + if !storageclass.IsValid(sc) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL, guessIsBrowserReq(r)) return } @@ -1355,8 +1356,8 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r } // Validate storage class metadata if present - if _, ok := r.Header[amzStorageClassCanonical]; ok { - if !isValidStorageClassMeta(r.Header.Get(amzStorageClassCanonical)) { + if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { + if !storageclass.IsValid(sc) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL, guessIsBrowserReq(r)) return } diff --git a/cmd/object-handlers_test.go b/cmd/object-handlers_test.go index 39bdb7993..592029553 100644 --- a/cmd/object-handlers_test.go +++ b/cmd/object-handlers_test.go @@ -38,6 +38,7 @@ import ( humanize "github.com/dustin/go-humanize" "github.com/minio/minio/cmd/crypto" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/pkg/auth" ioutilx "github.com/minio/minio/pkg/ioutil" ) @@ -1181,7 +1182,7 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a invalidMD5Header := http.Header{} invalidMD5Header.Set("Content-Md5", "42") inalidStorageClassHeader := http.Header{} - inalidStorageClassHeader.Set(amzStorageClass, "INVALID") + inalidStorageClassHeader.Set(xhttp.AmzStorageClass, "INVALID") addCustomHeaders := func(req *http.Request, customHeaders http.Header) { for k, values := range customHeaders { diff --git a/cmd/storage-class.go b/cmd/storage-class.go deleted file mode 100644 index ba610ec3a..000000000 --- a/cmd/storage-class.go +++ /dev/null @@ -1,213 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2017 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "context" - "encoding/json" - "fmt" - "strconv" - "strings" - - "github.com/minio/minio/cmd/config" -) - -const ( - // metadata entry for storage class - amzStorageClass = "x-amz-storage-class" - // Canonical metadata entry for storage class - amzStorageClassCanonical = "X-Amz-Storage-Class" - // Reduced redundancy storage class - reducedRedundancyStorageClass = "REDUCED_REDUNDANCY" - // Standard storage class - standardStorageClass = "STANDARD" - // Reduced redundancy storage class environment variable - reducedRedundancyStorageClassEnv = "MINIO_STORAGE_CLASS_RRS" - // Standard storage class environment variable - standardStorageClassEnv = "MINIO_STORAGE_CLASS_STANDARD" - // Supported storage class scheme is EC - supportedStorageClassScheme = "EC" - // Minimum parity disks - minimumParityDisks = 2 - defaultRRSParity = 2 -) - -// Struct to hold storage class -type storageClass struct { - Scheme string - Parity int -} - -type storageClassConfig struct { - Standard storageClass `json:"standard"` - RRS storageClass `json:"rrs"` -} - -// Validate SS and RRS parity when unmarshalling JSON. -func (sCfg *storageClassConfig) UnmarshalJSON(data []byte) error { - type Alias storageClassConfig - aux := &struct { - *Alias - }{ - Alias: (*Alias)(sCfg), - } - if err := json.Unmarshal(data, &aux); err != nil { - return err - } - return validateParity(aux.Standard.Parity, aux.RRS.Parity) -} - -// Validate if storage class in metadata -// Only Standard and RRS Storage classes are supported -func isValidStorageClassMeta(sc string) bool { - return sc == reducedRedundancyStorageClass || sc == standardStorageClass -} - -func (sc *storageClass) UnmarshalText(b []byte) error { - scStr := string(b) - if scStr == "" { - return nil - } - s, err := parseStorageClass(scStr) - if err != nil { - return err - } - sc.Parity = s.Parity - sc.Scheme = s.Scheme - return nil -} - -func (sc *storageClass) MarshalText() ([]byte, error) { - if sc.Scheme != "" && sc.Parity != 0 { - return []byte(fmt.Sprintf("%s:%d", sc.Scheme, sc.Parity)), nil - } - return []byte(""), nil -} - -// Parses given storageClassEnv and returns a storageClass structure. -// Supported Storage Class format is "Scheme:Number of parity disks". -// Currently only supported scheme is "EC". -func parseStorageClass(storageClassEnv string) (sc storageClass, err error) { - s := strings.Split(storageClassEnv, ":") - - // only two elements allowed in the string - "scheme" and "number of parity disks" - if len(s) > 2 { - return storageClass{}, config.ErrStorageClassValue(nil).Msg("Too many sections in " + storageClassEnv) - } else if len(s) < 2 { - return storageClass{}, config.ErrStorageClassValue(nil).Msg("Too few sections in " + storageClassEnv) - } - - // only allowed scheme is "EC" - if s[0] != supportedStorageClassScheme { - return storageClass{}, config.ErrStorageClassValue(nil).Msg("Unsupported scheme " + s[0] + ". Supported scheme is EC") - } - - // Number of parity disks should be integer - parityDisks, err := strconv.Atoi(s[1]) - if err != nil { - return storageClass{}, config.ErrStorageClassValue(err) - } - - sc = storageClass{ - Scheme: s[0], - Parity: parityDisks, - } - - return sc, nil -} - -// Validates the parity disks. -func validateParity(ssParity, rrsParity int) (err error) { - if ssParity == 0 && rrsParity == 0 { - return nil - } - - if !globalIsXL { - return fmt.Errorf("Setting storage class only allowed for erasure coding mode") - } - - // SS parity disks should be greater than or equal to minimumParityDisks. Parity below minimumParityDisks is not recommended. - if ssParity > 0 && ssParity < minimumParityDisks { - return fmt.Errorf("Standard storage class parity %d should be greater than or equal to %d", ssParity, minimumParityDisks) - } - - // RRS parity disks should be greater than or equal to minimumParityDisks. Parity below minimumParityDisks is not recommended. - if rrsParity > 0 && rrsParity < minimumParityDisks { - return fmt.Errorf("Reduced redundancy storage class parity %d should be greater than or equal to %d", rrsParity, minimumParityDisks) - } - - if ssParity > globalXLSetDriveCount/2 { - return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, globalXLSetDriveCount/2) - } - - if rrsParity > globalXLSetDriveCount/2 { - return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, globalXLSetDriveCount/2) - } - - if ssParity > 0 && rrsParity > 0 { - if ssParity < rrsParity { - return fmt.Errorf("Standard storage class parity disks %d should be greater than or equal to Reduced redundancy storage class parity disks %d", ssParity, rrsParity) - } - } - return nil -} - -// Returns the data and parity drive count based on storage class -// If storage class is set using the env vars MINIO_STORAGE_CLASS_RRS and MINIO_STORAGE_CLASS_STANDARD -// or config.json fields -// -- corresponding values are returned -// If storage class is not set during startup, default values are returned -// -- Default for Reduced Redundancy Storage class is, parity = 2 and data = N-Parity -// -- Default for Standard Storage class is, parity = N/2, data = N/2 -// If storage class is empty -// -- standard storage class is assumed and corresponding data and parity is returned -func getRedundancyCount(sc string, totalDisks int) (data, parity int) { - parity = totalDisks / 2 - switch sc { - case reducedRedundancyStorageClass: - if globalRRStorageClass.Parity != 0 { - // set the rrs parity if available - parity = globalRRStorageClass.Parity - } else { - // else fall back to default value - parity = defaultRRSParity - } - case standardStorageClass, "": - if globalStandardStorageClass.Parity != 0 { - // set the standard parity if available - parity = globalStandardStorageClass.Parity - } - } - // data is always totalDisks - parity - return totalDisks - parity, parity -} - -// Returns per object readQuorum and writeQuorum -// readQuorum is the minimum required disks to read data. -// writeQuorum is the minimum required disks to write data. -func objectQuorumFromMeta(ctx context.Context, xl xlObjects, partsMetaData []xlMetaV1, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) { - // get the latest updated Metadata and a count of all the latest updated xlMeta(s) - latestXLMeta, err := getLatestXLMeta(ctx, partsMetaData, errs) - - if err != nil { - return 0, 0, err - } - - // Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks - // from latestXLMeta to get the quorum - return latestXLMeta.Erasure.DataBlocks, latestXLMeta.Erasure.DataBlocks + 1, nil -} diff --git a/cmd/storage-class_test.go b/cmd/storage-class_test.go deleted file mode 100644 index c327cefee..000000000 --- a/cmd/storage-class_test.go +++ /dev/null @@ -1,346 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2017 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "bytes" - "context" - "errors" - "reflect" - "testing" -) - -func TestParseStorageClass(t *testing.T) { - ExecObjectLayerTest(t, testParseStorageClass) -} - -func testParseStorageClass(obj ObjectLayer, instanceType string, t TestErrHandler) { - tests := []struct { - storageClassEnv string - wantSc storageClass - expectedError error - }{ - {"EC:3", storageClass{ - Scheme: "EC", - Parity: 3}, - nil}, - {"EC:4", storageClass{ - Scheme: "EC", - Parity: 4}, - nil}, - {"AB:4", storageClass{ - Scheme: "EC", - Parity: 4}, - errors.New("Unsupported scheme AB. Supported scheme is EC")}, - {"EC:4:5", storageClass{ - Scheme: "EC", - Parity: 4}, - errors.New("Too many sections in EC:4:5")}, - {"AB", storageClass{ - Scheme: "EC", - Parity: 4}, - errors.New("Too few sections in AB")}, - } - for i, tt := range tests { - gotSc, err := parseStorageClass(tt.storageClassEnv) - if err != nil && tt.expectedError == nil { - t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err) - return - } - if err == nil && tt.expectedError != nil { - t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err) - return - } - if tt.expectedError == nil && !reflect.DeepEqual(gotSc, tt.wantSc) { - t.Errorf("Test %d, Expected %v, got %v", i+1, tt.wantSc, gotSc) - return - } - if tt.expectedError != nil && err.Error() != tt.expectedError.Error() { - t.Errorf("Test %d, Expected `%v`, got `%v`", i+1, tt.expectedError, err) - } - } -} - -func TestValidateParity(t *testing.T) { - ExecObjectLayerTestWithDirs(t, testValidateParity) -} - -func testValidateParity(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) { - // Reset global storage class flags - resetGlobalStorageEnvs() - - // Set proper envs for a single node XL setup. - saveIsXL := globalIsXL - defer func() { - globalIsXL = saveIsXL - }() - globalIsXL = true - saveSetDriveCount := globalXLSetDriveCount - defer func() { - globalXLSetDriveCount = saveSetDriveCount - }() - globalXLSetCount = len(dirs) - - tests := []struct { - rrsParity int - ssParity int - success bool - }{ - {2, 4, true}, - {3, 3, true}, - {1, 4, false}, - {7, 6, false}, - {9, 0, false}, - {9, 9, false}, - {2, 9, false}, - } - for i, tt := range tests { - err := validateParity(tt.ssParity, tt.rrsParity) - if err != nil && tt.success { - t.Errorf("Test %d, Expected success, got %s", i+1, err) - } - if err == nil && !tt.success { - t.Errorf("Test %d, Expected failure, got success", i+1) - } - } -} - -func TestRedundancyCount(t *testing.T) { - ExecObjectLayerTestWithDirs(t, testGetRedundancyCount) -} - -func testGetRedundancyCount(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) { - // Reset global storage class flags - resetGlobalStorageEnvs() - xl := obj.(*xlObjects) - - tests := []struct { - sc string - disksCount int - expectedData int - expectedParity int - }{ - {reducedRedundancyStorageClass, len(xl.storageDisks), 14, 2}, - {standardStorageClass, len(xl.storageDisks), 8, 8}, - {"", len(xl.storageDisks), 8, 8}, - {reducedRedundancyStorageClass, len(xl.storageDisks), 9, 7}, - {standardStorageClass, len(xl.storageDisks), 10, 6}, - {"", len(xl.storageDisks), 9, 7}, - } - for i, tt := range tests { - // Set env var for test case 4 - if i+1 == 4 { - globalRRStorageClass.Parity = 7 - } - // Set env var for test case 5 - if i+1 == 5 { - globalStandardStorageClass.Parity = 6 - } - // Set env var for test case 6 - if i+1 == 6 { - globalStandardStorageClass.Parity = 7 - } - data, parity := getRedundancyCount(tt.sc, tt.disksCount) - if data != tt.expectedData { - t.Errorf("Test %d, Expected data disks %d, got %d", i+1, tt.expectedData, data) - return - } - if parity != tt.expectedParity { - t.Errorf("Test %d, Expected parity disks %d, got %d", i+1, tt.expectedParity, parity) - return - } - } -} - -func TestObjectQuorumFromMeta(t *testing.T) { - ExecObjectLayerTestWithDirs(t, testObjectQuorumFromMeta) -} - -func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) { - // Reset global storage class flags - resetGlobalStorageEnvs() - bucket := getRandomBucketName() - - var opts ObjectOptions - // make data with more than one part - partCount := 3 - data := bytes.Repeat([]byte("a"), int(globalPutPartSize)*partCount) - xl := obj.(*xlObjects) - xlDisks := xl.storageDisks - - err := obj.MakeBucketWithLocation(context.Background(), bucket, globalMinioDefaultRegion) - if err != nil { - t.Fatalf("Failed to make a bucket %v", err) - } - - // Object for test case 1 - No StorageClass defined, no MetaData in PutObject - object1 := "object1" - _, err = obj.PutObject(context.Background(), bucket, object1, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts) - if err != nil { - t.Fatalf("Failed to putObject %v", err) - } - - parts1, errs1 := readAllXLMetadata(context.Background(), xlDisks, bucket, object1) - - // Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class - object2 := "object2" - metadata2 := make(map[string]string) - metadata2["x-amz-storage-class"] = reducedRedundancyStorageClass - _, err = obj.PutObject(context.Background(), bucket, object2, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata2}) - if err != nil { - t.Fatalf("Failed to putObject %v", err) - } - - parts2, errs2 := readAllXLMetadata(context.Background(), xlDisks, bucket, object2) - - // Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class - object3 := "object3" - metadata3 := make(map[string]string) - metadata3["x-amz-storage-class"] = standardStorageClass - _, err = obj.PutObject(context.Background(), bucket, object3, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata3}) - if err != nil { - t.Fatalf("Failed to putObject %v", err) - } - - parts3, errs3 := readAllXLMetadata(context.Background(), xlDisks, bucket, object3) - - // Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class - object4 := "object4" - metadata4 := make(map[string]string) - metadata4["x-amz-storage-class"] = standardStorageClass - globalStandardStorageClass = storageClass{ - Parity: 6, - Scheme: "EC", - } - - _, err = obj.PutObject(context.Background(), bucket, object4, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata4}) - if err != nil { - t.Fatalf("Failed to putObject %v", err) - } - - parts4, errs4 := readAllXLMetadata(context.Background(), xlDisks, bucket, object4) - - // Object for test case 5 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting RRS Class - // Reset global storage class flags - resetGlobalStorageEnvs() - object5 := "object5" - metadata5 := make(map[string]string) - metadata5["x-amz-storage-class"] = reducedRedundancyStorageClass - globalRRStorageClass = storageClass{ - Parity: 2, - Scheme: "EC", - } - - _, err = obj.PutObject(context.Background(), bucket, object5, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata5}) - if err != nil { - t.Fatalf("Failed to putObject %v", err) - } - - parts5, errs5 := readAllXLMetadata(context.Background(), xlDisks, bucket, object5) - - // Object for test case 6 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting Standard Storage Class - // Reset global storage class flags - resetGlobalStorageEnvs() - object6 := "object6" - metadata6 := make(map[string]string) - metadata6["x-amz-storage-class"] = standardStorageClass - globalRRStorageClass = storageClass{ - Parity: 2, - Scheme: "EC", - } - - _, err = obj.PutObject(context.Background(), bucket, object6, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata6}) - if err != nil { - t.Fatalf("Failed to putObject %v", err) - } - - parts6, errs6 := readAllXLMetadata(context.Background(), xlDisks, bucket, object6) - - // Object for test case 7 - Standard StorageClass defined as Parity 5, MetaData in PutObject requesting RRS Class - // Reset global storage class flags - resetGlobalStorageEnvs() - object7 := "object7" - metadata7 := make(map[string]string) - metadata7["x-amz-storage-class"] = reducedRedundancyStorageClass - globalStandardStorageClass = storageClass{ - Parity: 5, - Scheme: "EC", - } - - _, err = obj.PutObject(context.Background(), bucket, object7, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata7}) - if err != nil { - t.Fatalf("Failed to putObject %v", err) - } - - parts7, errs7 := readAllXLMetadata(context.Background(), xlDisks, bucket, object7) - - tests := []struct { - parts []xlMetaV1 - errs []error - expectedReadQuorum int - expectedWriteQuorum int - expectedError error - }{ - {parts1, errs1, 8, 9, nil}, - {parts2, errs2, 14, 15, nil}, - {parts3, errs3, 8, 9, nil}, - {parts4, errs4, 10, 11, nil}, - {parts5, errs5, 14, 15, nil}, - {parts6, errs6, 8, 9, nil}, - {parts7, errs7, 14, 15, nil}, - } - for i, tt := range tests { - actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(context.Background(), *xl, tt.parts, tt.errs) - if tt.expectedError != nil && err == nil { - t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err) - return - } - if tt.expectedError == nil && err != nil { - t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err) - return - } - if tt.expectedReadQuorum != actualReadQuorum { - t.Errorf("Test %d, Expected Read Quorum %d, got %d", i+1, tt.expectedReadQuorum, actualReadQuorum) - return - } - if tt.expectedWriteQuorum != actualWriteQuorum { - t.Errorf("Test %d, Expected Write Quorum %d, got %d", i+1, tt.expectedWriteQuorum, actualWriteQuorum) - return - } - } -} - -// Test isValidStorageClassMeta method with valid and invalid inputs -func TestIsValidStorageClassMeta(t *testing.T) { - tests := []struct { - sc string - want bool - }{ - {"STANDARD", true}, - {"REDUCED_REDUNDANCY", true}, - {"", false}, - {"INVALID", false}, - {"123", false}, - {"MINIO_STORAGE_CLASS_RRS", false}, - {"MINIO_STORAGE_CLASS_STANDARD", false}, - } - for i, tt := range tests { - if got := isValidStorageClassMeta(tt.sc); got != tt.want { - t.Errorf("Test %d, Expected Storage Class to be %t, got %t", i+1, tt.want, got) - } - } -} diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 1e9bf3735..fa0083d88 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -468,12 +468,6 @@ func resetGlobalIsEnvs() { globalIsEnvWORM = false globalIsEnvBrowser = false globalIsEnvRegion = false - globalIsStorageClass = false -} - -func resetGlobalStorageEnvs() { - globalStandardStorageClass = storageClass{} - globalRRStorageClass = storageClass{} } // reset global heal state @@ -523,8 +517,6 @@ func resetTestGlobals() { resetGlobalIsXL() // Reset global isEnvCreds flag. resetGlobalIsEnvs() - // Reset global storage class flags - resetGlobalStorageEnvs() // Reset global heal state resetGlobalHealState() //Reset global disk cache flags diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index bbe1a8122..49499e51b 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -27,6 +27,8 @@ import ( "sync" "time" + "github.com/minio/minio/cmd/config/storageclass" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" "github.com/minio/minio/pkg/lifecycle" @@ -326,13 +328,17 @@ func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo { storageInfo.Backend.OfflineDisks += lstorageInfo.Backend.OfflineDisks } - scData, scParity := getRedundancyCount(standardStorageClass, s.drivesPerSet) - storageInfo.Backend.StandardSCData = scData + scfg := globalServerConfig.GetStorageClass() + scParity := scfg.GetParityForSC(storageclass.STANDARD) + if scParity == 0 { + scParity = s.drivesPerSet / 2 + } + storageInfo.Backend.StandardSCData = s.drivesPerSet - scParity storageInfo.Backend.StandardSCParity = scParity - rrSCData, rrSCparity := getRedundancyCount(reducedRedundancyStorageClass, s.drivesPerSet) - storageInfo.Backend.RRSCData = rrSCData - storageInfo.Backend.RRSCParity = rrSCparity + rrSCParity := scfg.GetParityForSC(storageclass.RRS) + storageInfo.Backend.RRSCData = s.drivesPerSet - rrSCParity + storageInfo.Backend.RRSCParity = rrSCParity storageInfo.Backend.Sets = make([][]madmin.DriveInfo, s.setCount) for i := range storageInfo.Backend.Sets { @@ -1028,7 +1034,7 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker objInfo.UserDefined = cleanMetadata(result.Metadata) // Update storage class - if sc, ok := result.Metadata[amzStorageClass]; ok { + if sc, ok := result.Metadata[xhttp.AmzStorageClass]; ok { objInfo.StorageClass = sc } else { objInfo.StorageClass = globalMinioDefaultStorageClass @@ -1171,7 +1177,7 @@ func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimi objInfo.UserDefined = cleanMetadata(entry.Metadata) // Update storage class - if sc, ok := entry.Metadata[amzStorageClass]; ok { + if sc, ok := entry.Metadata[xhttp.AmzStorageClass]; ok { objInfo.StorageClass = sc } else { objInfo.StorageClass = globalMinioDefaultStorageClass diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 748b39b49..1aff7da11 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -27,9 +27,9 @@ import ( "sync" "time" - "github.com/minio/sha256-simd" - + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/minio/sha256-simd" ) const erasureAlgorithmKlauspost = "klauspost/reedsolomon/vandermonde" @@ -252,7 +252,7 @@ func (m xlMetaV1) ToObjectInfo(bucket, object string) ObjectInfo { objInfo.Parts = m.Parts // Update storage class - if sc, ok := m.Meta[amzStorageClass]; ok { + if sc, ok := m.Meta[xhttp.AmzStorageClass]; ok { objInfo.StorageClass = sc } else { objInfo.StorageClass = globalMinioDefaultStorageClass @@ -481,3 +481,19 @@ func writeUniqueXLMetadata(ctx context.Context, disks []StorageAPI, bucket, pref err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum) return evalDisks(disks, mErrs), err } + +// Returns per object readQuorum and writeQuorum +// readQuorum is the min required disks to read data. +// writeQuorum is the min required disks to write data. +func objectQuorumFromMeta(ctx context.Context, xl xlObjects, partsMetaData []xlMetaV1, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) { + // get the latest updated Metadata and a count of all the latest updated xlMeta(s) + latestXLMeta, err := getLatestXLMeta(ctx, partsMetaData, errs) + + if err != nil { + return 0, 0, err + } + + // Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks + // from latestXLMeta to get the quorum + return latestXLMeta.Erasure.DataBlocks, latestXLMeta.Erasure.DataBlocks + 1, nil +} diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index c1ae07086..571cd6811 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -27,6 +27,7 @@ import ( "sync" "time" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/mimedb" ) @@ -188,7 +189,13 @@ func (xl xlObjects) ListMultipartUploads(ctx context.Context, bucket, object, ke // operation(s) on the object. func (xl xlObjects) newMultipartUpload(ctx context.Context, bucket string, object string, meta map[string]string) (string, error) { - dataBlocks, parityBlocks := getRedundancyCount(meta[amzStorageClass], len(xl.getDisks())) + onlineDisks := xl.getDisks() + scfg := globalServerConfig.GetStorageClass() + parityBlocks := scfg.GetParityForSC(meta[xhttp.AmzStorageClass]) + if parityBlocks == 0 { + parityBlocks = len(onlineDisks) / 2 + } + dataBlocks := len(onlineDisks) - parityBlocks xlMeta := newXLMetaV1(object, dataBlocks, parityBlocks) @@ -212,7 +219,6 @@ func (xl xlObjects) newMultipartUpload(ctx context.Context, bucket string, objec // success. defer xl.deleteObject(ctx, minioMetaTmpBucket, tempUploadIDPath, writeQuorum, false) - onlineDisks := xl.getDisks() var partsMetadata = make([]xlMetaV1, len(onlineDisks)) for i := range onlineDisks { partsMetadata[i] = xlMeta diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index ca7d7abe9..0316f9daa 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -24,6 +24,7 @@ import ( "path" "sync" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/mimedb" ) @@ -524,8 +525,15 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, opts.UserDefined = make(map[string]string) } + storageDisks := xl.getDisks() + // Get parity and data drive count based on storage class metadata - dataDrives, parityDrives := getRedundancyCount(opts.UserDefined[amzStorageClass], len(xl.getDisks())) + scfg := globalServerConfig.GetStorageClass() + parityDrives := scfg.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass]) + if parityDrives == 0 { + parityDrives = len(storageDisks) / 2 + } + dataDrives := len(storageDisks) - parityDrives // we now know the number of blocks this object needs for data and parity. // writeQuorum is dataBlocks + 1 @@ -553,7 +561,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, // Rename the successfully written temporary object to final location. Ignore errFileAccessDenied // error because it means that the target object dir exists and we want to be close to S3 specification. - if _, err = rename(ctx, xl.getDisks(), minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, []error{errFileAccessDenied}); err != nil { + if _, err = rename(ctx, storageDisks, minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, []error{errFileAccessDenied}); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -584,7 +592,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, } // Order disks according to erasure distribution - onlineDisks := shuffleDisks(xl.getDisks(), xlMeta.Erasure.Distribution) + onlineDisks := shuffleDisks(storageDisks, xlMeta.Erasure.Distribution) erasure, err := NewErasure(ctx, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize) if err != nil { diff --git a/cmd/xl-v1-object_test.go b/cmd/xl-v1-object_test.go index dfbb90ee5..2993ed050 100644 --- a/cmd/xl-v1-object_test.go +++ b/cmd/xl-v1-object_test.go @@ -28,6 +28,7 @@ import ( "time" humanize "github.com/dustin/go-humanize" + "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/pkg/madmin" ) @@ -186,8 +187,6 @@ func TestXLDeleteObjectsXLSet(t *testing.T) { } func TestXLDeleteObjectDiskNotFound(t *testing.T) { - // Reset global storage class flags - resetGlobalStorageEnvs() // Create an instance of xl backend. obj, fsDirs, err := prepareXL16() if err != nil { @@ -444,3 +443,159 @@ func TestHealing(t *testing.T) { t.Fatal(err) } } + +func TestObjectQuorumFromMeta(t *testing.T) { + ExecObjectLayerTestWithDirs(t, testObjectQuorumFromMeta) +} + +func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) { + bucket := getRandomBucketName() + + var opts ObjectOptions + // make data with more than one part + partCount := 3 + data := bytes.Repeat([]byte("a"), int(globalPutPartSize)*partCount) + xl := obj.(*xlObjects) + xlDisks := xl.storageDisks + + err := obj.MakeBucketWithLocation(context.Background(), bucket, globalMinioDefaultRegion) + if err != nil { + t.Fatalf("Failed to make a bucket %v", err) + } + + // Object for test case 1 - No StorageClass defined, no MetaData in PutObject + object1 := "object1" + _, err = obj.PutObject(context.Background(), bucket, object1, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts1, errs1 := readAllXLMetadata(context.Background(), xlDisks, bucket, object1) + + // Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class + object2 := "object2" + metadata2 := make(map[string]string) + metadata2["x-amz-storage-class"] = storageclass.RRS + _, err = obj.PutObject(context.Background(), bucket, object2, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata2}) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts2, errs2 := readAllXLMetadata(context.Background(), xlDisks, bucket, object2) + + // Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class + object3 := "object3" + metadata3 := make(map[string]string) + metadata3["x-amz-storage-class"] = storageclass.STANDARD + _, err = obj.PutObject(context.Background(), bucket, object3, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata3}) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts3, errs3 := readAllXLMetadata(context.Background(), xlDisks, bucket, object3) + + // Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class + object4 := "object4" + metadata4 := make(map[string]string) + metadata4["x-amz-storage-class"] = storageclass.STANDARD + globalServerConfig.StorageClass = storageclass.Config{ + Standard: storageclass.StorageClass{ + Parity: 6, + }, + } + + _, err = obj.PutObject(context.Background(), bucket, object4, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata4}) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts4, errs4 := readAllXLMetadata(context.Background(), xlDisks, bucket, object4) + + // Object for test case 5 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting RRS Class + // Reset global storage class flags + object5 := "object5" + metadata5 := make(map[string]string) + metadata5["x-amz-storage-class"] = storageclass.RRS + globalServerConfig.StorageClass = storageclass.Config{ + RRS: storageclass.StorageClass{ + Parity: 2, + }, + } + + _, err = obj.PutObject(context.Background(), bucket, object5, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata5}) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts5, errs5 := readAllXLMetadata(context.Background(), xlDisks, bucket, object5) + + // Object for test case 6 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting Standard Storage Class + object6 := "object6" + metadata6 := make(map[string]string) + metadata6["x-amz-storage-class"] = storageclass.STANDARD + globalServerConfig.StorageClass = storageclass.Config{ + RRS: storageclass.StorageClass{ + Parity: 2, + }, + } + + _, err = obj.PutObject(context.Background(), bucket, object6, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata6}) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts6, errs6 := readAllXLMetadata(context.Background(), xlDisks, bucket, object6) + + // Object for test case 7 - Standard StorageClass defined as Parity 5, MetaData in PutObject requesting RRS Class + // Reset global storage class flags + object7 := "object7" + metadata7 := make(map[string]string) + metadata7["x-amz-storage-class"] = storageclass.RRS + globalServerConfig.StorageClass = storageclass.Config{ + Standard: storageclass.StorageClass{ + Parity: 5, + }, + } + + _, err = obj.PutObject(context.Background(), bucket, object7, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata7}) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts7, errs7 := readAllXLMetadata(context.Background(), xlDisks, bucket, object7) + + tests := []struct { + parts []xlMetaV1 + errs []error + expectedReadQuorum int + expectedWriteQuorum int + expectedError error + }{ + {parts1, errs1, 8, 9, nil}, + {parts2, errs2, 14, 15, nil}, + {parts3, errs3, 8, 9, nil}, + {parts4, errs4, 10, 11, nil}, + {parts5, errs5, 14, 15, nil}, + {parts6, errs6, 8, 9, nil}, + {parts7, errs7, 14, 15, nil}, + } + for i, tt := range tests { + actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(context.Background(), *xl, tt.parts, tt.errs) + if tt.expectedError != nil && err == nil { + t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err) + return + } + if tt.expectedError == nil && err != nil { + t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err) + return + } + if tt.expectedReadQuorum != actualReadQuorum { + t.Errorf("Test %d, Expected Read Quorum %d, got %d", i+1, tt.expectedReadQuorum, actualReadQuorum) + return + } + if tt.expectedWriteQuorum != actualWriteQuorum { + t.Errorf("Test %d, Expected Write Quorum %d, got %d", i+1, tt.expectedWriteQuorum, actualWriteQuorum) + return + } + } +} diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 36becad40..8b746b2a1 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -144,9 +144,6 @@ func getStorageInfo(disks []StorageAPI) StorageInfo { available = available + di.Free } - _, sscParity := getRedundancyCount(standardStorageClass, len(disks)) - _, rrscparity := getRedundancyCount(reducedRedundancyStorageClass, len(disks)) - storageInfo := StorageInfo{ Used: used, Total: total, @@ -157,9 +154,6 @@ func getStorageInfo(disks []StorageAPI) StorageInfo { storageInfo.Backend.OnlineDisks = onlineDisks storageInfo.Backend.OfflineDisks = offlineDisks - storageInfo.Backend.StandardSCParity = sscParity - storageInfo.Backend.RRSCParity = rrscparity - return storageInfo } diff --git a/pkg/event/target/amqp.go b/pkg/event/target/amqp.go index bd10e5fc0..6805883f9 100644 --- a/pkg/event/target/amqp.go +++ b/pkg/event/target/amqp.go @@ -26,7 +26,6 @@ import ( "path/filepath" "sync" - "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" "github.com/streadway/amqp" @@ -72,11 +71,12 @@ func (a *AMQPArgs) Validate() error { // AMQPTarget - AMQP target type AMQPTarget struct { - id event.TargetID - args AMQPArgs - conn *amqp.Connection - connMutex sync.Mutex - store Store + id event.TargetID + args AMQPArgs + conn *amqp.Connection + connMutex sync.Mutex + store Store + loggerOnce func(ctx context.Context, err error, id interface{}) } // ID - returns TargetID. @@ -174,7 +174,7 @@ func (target *AMQPTarget) Save(eventData event.Event) error { } defer func() { cErr := ch.Close() - logger.LogOnceIf(context.Background(), cErr, target.ID()) + target.loggerOnce(context.Background(), cErr, target.ID()) }() return target.send(eventData, ch) @@ -188,7 +188,7 @@ func (target *AMQPTarget) Send(eventKey string) error { } defer func() { cErr := ch.Close() - logger.LogOnceIf(context.Background(), cErr, target.ID()) + target.loggerOnce(context.Background(), cErr, target.ID()) }() eventData, eErr := target.store.Get(eventKey) @@ -215,7 +215,7 @@ func (target *AMQPTarget) Close() error { } // NewAMQPTarget - creates new AMQP target. -func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}) (*AMQPTarget, error) { +func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*AMQPTarget, error) { var conn *amqp.Connection var err error @@ -237,10 +237,11 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}) (*AMQPTarge } target := &AMQPTarget{ - id: event.TargetID{ID: id, Name: "amqp"}, - args: args, - conn: conn, - store: store, + id: event.TargetID{ID: id, Name: "amqp"}, + args: args, + conn: conn, + store: store, + loggerOnce: loggerOnce, } if target.store != nil { diff --git a/pkg/event/target/redis.go b/pkg/event/target/redis.go index d0243ae7e..790d7e1be 100644 --- a/pkg/event/target/redis.go +++ b/pkg/event/target/redis.go @@ -28,7 +28,6 @@ import ( "time" "github.com/gomodule/redigo/redis" - "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" ) @@ -95,11 +94,12 @@ func (r RedisArgs) validateFormat(c redis.Conn) error { // RedisTarget - Redis target. type RedisTarget struct { - id event.TargetID - args RedisArgs - pool *redis.Pool - store Store - firstPing bool + id event.TargetID + args RedisArgs + pool *redis.Pool + store Store + firstPing bool + loggerOnce func(ctx context.Context, err error, id interface{}) } // ID - returns target ID. @@ -115,7 +115,7 @@ func (target *RedisTarget) Save(eventData event.Event) error { conn := target.pool.Get() defer func() { cErr := conn.Close() - logger.LogOnceIf(context.Background(), cErr, target.ID()) + target.loggerOnce(context.Background(), cErr, target.ID()) }() _, pingErr := conn.Do("PING") if pingErr != nil { @@ -132,7 +132,7 @@ func (target *RedisTarget) send(eventData event.Event) error { conn := target.pool.Get() defer func() { cErr := conn.Close() - logger.LogOnceIf(context.Background(), cErr, target.ID()) + target.loggerOnce(context.Background(), cErr, target.ID()) }() if target.args.Format == event.NamespaceFormat { @@ -175,7 +175,7 @@ func (target *RedisTarget) Send(eventKey string) error { conn := target.pool.Get() defer func() { cErr := conn.Close() - logger.LogOnceIf(context.Background(), cErr, target.ID()) + target.loggerOnce(context.Background(), cErr, target.ID()) }() _, pingErr := conn.Do("PING") if pingErr != nil { @@ -222,7 +222,7 @@ func (target *RedisTarget) Close() error { } // NewRedisTarget - creates new Redis target. -func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}) (*RedisTarget, error) { +func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*RedisTarget, error) { pool := &redis.Pool{ MaxIdle: 3, IdleTimeout: 2 * 60 * time.Second, @@ -239,7 +239,7 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}) (*RedisTa if _, err = conn.Do("AUTH", args.Password); err != nil { cErr := conn.Close() targetID := event.TargetID{ID: id, Name: "redis"} - logger.LogOnceIf(context.Background(), cErr, targetID.String()) + loggerOnce(context.Background(), cErr, targetID) return nil, err } @@ -262,16 +262,17 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}) (*RedisTa } target := &RedisTarget{ - id: event.TargetID{ID: id, Name: "redis"}, - args: args, - pool: pool, - store: store, + id: event.TargetID{ID: id, Name: "redis"}, + args: args, + pool: pool, + store: store, + loggerOnce: loggerOnce, } conn := target.pool.Get() defer func() { cErr := conn.Close() - logger.LogOnceIf(context.Background(), cErr, target.ID()) + target.loggerOnce(context.Background(), cErr, target.ID()) }() _, pingErr := conn.Do("PING") diff --git a/pkg/iam/openid/jwt.go b/pkg/iam/openid/jwt.go index b3febc1d0..4e0574178 100644 --- a/pkg/iam/openid/jwt.go +++ b/pkg/iam/openid/jwt.go @@ -18,37 +18,41 @@ package openid import ( "crypto" - "crypto/tls" "encoding/json" "errors" "fmt" - "net" + "io" "net/http" "strconv" "time" jwtgo "github.com/dgrijalva/jwt-go" + "github.com/minio/minio/pkg/env" xnet "github.com/minio/minio/pkg/net" ) // JWKSArgs - RSA authentication target arguments type JWKSArgs struct { - URL *xnet.URL `json:"url"` - publicKeys map[string]crypto.PublicKey + URL *xnet.URL `json:"url"` + publicKeys map[string]crypto.PublicKey + transport *http.Transport + closeRespFn func(io.ReadCloser) } // PopulatePublicKey - populates a new publickey from the JWKS URL. func (r *JWKSArgs) PopulatePublicKey() error { - insecureClient := &http.Client{Transport: newCustomHTTPTransport(true)} - client := &http.Client{Transport: newCustomHTTPTransport(false)} + if r.URL == nil { + return nil + } + client := &http.Client{} + if r.transport != nil { + client.Transport = r.transport + } resp, err := client.Get(r.URL.String()) if err != nil { - resp, err = insecureClient.Get(r.URL.String()) - if err != nil { - return err - } + return err } - defer resp.Body.Close() + defer r.closeRespFn(resp.Body) if resp.StatusCode != http.StatusOK { return errors.New(resp.Status) } @@ -133,27 +137,6 @@ func GetDefaultExpiration(dsecs string) (time.Duration, error) { return defaultExpiryDuration, nil } -// newCustomHTTPTransport returns a new http configuration -// used while communicating with the cloud backends. -// This sets the value for MaxIdleConnsPerHost from 2 (go default) -// to 100. -func newCustomHTTPTransport(insecure bool) *http.Transport { - return &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - IdleConnTimeout: 30 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}, - DisableCompression: true, - } -} - // Validate - validates the access token. func (p *JWT) Validate(token, dsecs string) (map[string]interface{}, error) { jp := new(jwtgo.Parser) @@ -211,6 +194,34 @@ func (p *JWT) ID() ID { return "jwt" } +// JWKS url +const ( + EnvIAMJWKSURL = "MINIO_IAM_JWKS_URL" +) + +// LookupConfig lookup jwks from config, override with any ENVs. +func LookupConfig(args JWKSArgs, transport *http.Transport, closeRespFn func(io.ReadCloser)) (JWKSArgs, error) { + var urlStr string + if args.URL != nil { + urlStr = args.URL.String() + } + + jwksURL := env.Get(EnvIAMJWKSURL, urlStr) + if jwksURL == "" { + return args, nil + } + + u, err := xnet.ParseURL(jwksURL) + if err != nil { + return args, err + } + args.URL = u + if err := args.PopulatePublicKey(); err != nil { + return args, err + } + return args, nil +} + // NewJWT - initialize new jwt authenticator. func NewJWT(args JWKSArgs) *JWT { return &JWT{ diff --git a/pkg/iam/policy/opa.go b/pkg/iam/policy/opa.go index d8b3faa8f..e16c195b2 100644 --- a/pkg/iam/policy/opa.go +++ b/pkg/iam/policy/opa.go @@ -23,9 +23,16 @@ import ( "io/ioutil" "net/http" + "github.com/minio/minio/pkg/env" xnet "github.com/minio/minio/pkg/net" ) +// Env IAM OPA URL +const ( + EnvIAMOPAURL = "MINIO_IAM_OPA_URL" + EnvIAMOPAAuthToken = "MINIO_IAM_OPA_AUTHTOKEN" +) + // OpaArgs opa general purpose policy engine configuration. type OpaArgs struct { URL *xnet.URL `json:"url"` @@ -82,10 +89,36 @@ type Opa struct { client *http.Client } +// LookupConfig lookup Opa from config, override with any ENVs. +func LookupConfig(args OpaArgs, transport *http.Transport, closeRespFn func(io.ReadCloser)) (OpaArgs, error) { + var urlStr string + if args.URL != nil { + urlStr = args.URL.String() + } + opaURL := env.Get(EnvIAMOPAURL, urlStr) + if opaURL == "" { + return args, nil + } + u, err := xnet.ParseURL(opaURL) + if err != nil { + return args, err + } + args = OpaArgs{ + URL: u, + AuthToken: env.Get(EnvIAMOPAAuthToken, ""), + Transport: transport, + CloseRespFn: closeRespFn, + } + if err = args.Validate(); err != nil { + return args, err + } + return args, nil +} + // NewOpa - initializes opa policy engine connector. func NewOpa(args OpaArgs) *Opa { // No opa args. - if args.URL == nil && args.AuthToken == "" { + if args.URL == nil || args.URL.Scheme == "" && args.AuthToken == "" { return nil } return &Opa{