Remove safe mode for invalid entries in config (#8650)

The approach is that now safe mode is only invoked when
we cannot read the config or under some catastrophic
situations, but not under situations when config entries
are invalid or unreachable. This allows for maximum
availability for MinIO and not fail on our users unlike
most of our historical releases.
master
Harshavardhana 5 years ago committed by GitHub
parent c10ecacf91
commit 1dc5f2d0af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      cmd/common-main.go
  2. 78
      cmd/config-current.go
  3. 61
      cmd/config/config.go
  4. 3
      cmd/config/etcd/etcd.go
  5. 4
      cmd/config/notify/parse.go
  6. 12
      cmd/gateway-main.go
  7. 30
      cmd/server-main.go

@ -213,6 +213,11 @@ func handleCommonEnvVars() {
globalActiveCred = cred
globalConfigEncrypted = true
}
globalWORMEnabled, err = config.LookupWorm()
if err != nil {
logger.Fatal(config.ErrInvalidWormValue(err), "Invalid worm configuration")
}
}
func logStartupMessage(msg string) {

@ -291,23 +291,28 @@ func validateConfig(s config.Config) error {
globalNotificationSys.ConfiguredTargetIDs())
}
func lookupConfigs(s config.Config) (err error) {
func lookupConfigs(s config.Config) {
ctx := context.Background()
var err error
if !globalActiveCred.IsValid() {
// Env doesn't seem to be set, we fallback to lookup creds from the config.
globalActiveCred, err = config.LookupCreds(s[config.CredentialsSubSys][config.Default])
if err != nil {
return fmt.Errorf("Invalid credentials configuration: %w", err)
logger.LogIf(ctx, fmt.Errorf("Invalid credentials configuration: %w", err))
}
}
etcdCfg, err := xetcd.LookupConfig(s[config.EtcdSubSys][config.Default], globalRootCAs)
if err != nil {
return fmt.Errorf("Unable to initialize etcd config: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to initialize etcd config: %w", err))
}
globalEtcdClient, err = xetcd.New(etcdCfg)
if err != nil {
return fmt.Errorf("Unable to initialize etcd config: %w", err)
if etcdCfg.Enabled {
globalEtcdClient, err = xetcd.New(etcdCfg)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize etcd config: %w", err))
}
}
if len(globalDomainNames) != 0 && !globalDomainIPs.IsEmpty() && globalEtcdClient != nil {
@ -318,51 +323,46 @@ func lookupConfigs(s config.Config) (err error) {
dns.CoreDNSPath(etcdCfg.CoreDNSPath),
)
if err != nil {
return config.Errorf(config.SafeModeKind,
"Unable to initialize DNS config for %s: %s", globalDomainNames, err)
logger.LogIf(ctx, fmt.Errorf("Unable to initialize DNS config for %s: %w",
globalDomainNames, err))
}
}
globalServerRegion, err = config.LookupRegion(s[config.RegionSubSys][config.Default])
if err != nil {
return fmt.Errorf("Invalid region configuration: %w", err)
}
globalWORMEnabled, err = config.LookupWorm()
if err != nil {
return fmt.Errorf("Invalid worm configuration: %w", err)
logger.LogIf(ctx, fmt.Errorf("Invalid region configuration: %w", err))
}
if globalIsXL {
globalStorageClass, err = storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default],
globalXLSetDriveCount)
if err != nil {
return fmt.Errorf("Unable to initialize storage class config: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to initialize storage class config: %w", err))
}
}
globalCacheConfig, err = cache.LookupConfig(s[config.CacheSubSys][config.Default])
if err != nil {
return fmt.Errorf("Unable to setup cache: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to setup cache: %w", err))
}
if globalCacheConfig.Enabled {
if cacheEncKey := env.Get(cache.EnvCacheEncryptionMasterKey, ""); cacheEncKey != "" {
globalCacheKMS, err = crypto.ParseMasterKey(cacheEncKey)
if err != nil {
return fmt.Errorf("Unable to setup encryption cache: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to setup encryption cache: %w", err))
}
}
}
kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get())
if err != nil {
return fmt.Errorf("Unable to setup KMS config: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to setup KMS config: %w", err))
}
GlobalKMS, err = crypto.NewKMS(kmsCfg)
if err != nil {
return fmt.Errorf("Unable to setup KMS with current KMS config: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to setup KMS with current KMS config: %w", err))
}
// Enable auto-encryption if enabled
@ -370,19 +370,19 @@ func lookupConfigs(s config.Config) (err error) {
globalCompressConfig, err = compress.LookupConfig(s[config.CompressionSubSys][config.Default])
if err != nil {
return fmt.Errorf("Unable to setup Compression: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to setup Compression: %w", err))
}
globalOpenIDConfig, err = openid.LookupConfig(s[config.IdentityOpenIDSubSys][config.Default],
NewCustomHTTPTransport(), xhttp.DrainBody)
if err != nil {
return fmt.Errorf("Unable to initialize OpenID: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to initialize OpenID: %w", err))
}
opaCfg, err := opa.LookupConfig(s[config.PolicyOPASubSys][config.Default],
NewCustomHTTPTransport(), xhttp.DrainBody)
if err != nil {
return fmt.Errorf("Unable to initialize OPA: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to initialize OPA: %w", err))
}
globalOpenIDValidators = getOpenIDValidators(globalOpenIDConfig)
@ -391,7 +391,7 @@ func lookupConfigs(s config.Config) (err error) {
globalLDAPConfig, err = xldap.Lookup(s[config.IdentityLDAPSubSys][config.Default],
globalRootCAs)
if err != nil {
return fmt.Errorf("Unable to parse LDAP configuration: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to parse LDAP configuration: %w", err))
}
// Load logger targets based on user's configuration
@ -399,7 +399,7 @@ func lookupConfigs(s config.Config) (err error) {
loggerCfg, err := logger.LookupConfig(s)
if err != nil {
return fmt.Errorf("Unable to initialize logger: %w", err)
logger.LogIf(ctx, fmt.Errorf("Unable to initialize logger: %w", err))
}
for _, l := range loggerCfg.HTTP {
@ -415,11 +415,6 @@ func lookupConfigs(s config.Config) (err error) {
logger.AddAuditTarget(http.New(l.Endpoint, loggerUserAgent, string(logger.All), NewCustomHTTPTransport()))
}
}
// Enable console logging
logger.AddTarget(globalConsoleSys.Console())
return nil
}
// Help - return sub-system level help
@ -437,32 +432,25 @@ func GetHelp(subSys, key string, envOnly bool) (Help, error) {
}
subSystemValue := strings.SplitN(subSys, config.SubSystemSeparator, 2)
if len(subSystemValue) == 0 {
return Help{}, config.Errorf(
config.SafeModeKind,
"invalid number of arguments %s", subSys)
return Help{}, config.Errorf("invalid number of arguments %s", subSys)
}
subSys = subSystemValue[0]
subSysHelp, ok := config.HelpSubSysMap[""].Lookup(subSys)
if !ok {
return Help{}, config.Errorf(
config.SafeModeKind,
"unknown sub-system %s", subSys)
return Help{}, config.Errorf("unknown sub-system %s", subSys)
}
h, ok := config.HelpSubSysMap[subSys]
if !ok {
return Help{}, config.Errorf(
config.SafeModeKind,
"unknown sub-system %s", subSys)
return Help{}, config.Errorf("unknown sub-system %s", subSys)
}
if key != "" {
value, ok := h.Lookup(key)
if !ok {
return Help{}, config.Errorf(
config.SafeModeKind,
"unknown key %s for sub-system %s", key, subSys)
return Help{}, config.Errorf("unknown key %s for sub-system %s",
key, subSys)
}
h = config.HelpKVS{value}
}
@ -502,9 +490,7 @@ func newSrvConfig(objAPI ObjectLayer) error {
srvCfg := newServerConfig()
// Override any values from ENVs.
if err := lookupConfigs(srvCfg); err != nil {
return err
}
lookupConfigs(srvCfg)
// hold the mutex lock before a new config is assigned.
globalServerConfigMu.Lock()
@ -528,9 +514,7 @@ func loadConfig(objAPI ObjectLayer) error {
}
// Override any values from ENVs.
if err = lookupConfigs(srvCfg); err != nil {
return err
}
lookupConfigs(srvCfg)
// hold the mutex lock before a new config is assigned.
globalServerConfigMu.Lock()

@ -32,23 +32,13 @@ import (
// Error config error type
type Error struct {
Kind ErrorKind
Err string
Err string
}
// ErrorKind config error kind
type ErrorKind int8
// Various error kinds.
const (
ContinueKind ErrorKind = iota + 1
SafeModeKind
)
// Errorf - formats according to a format specifier and returns
// the string as a value that satisfies error of type config.Error
func Errorf(errKind ErrorKind, format string, a ...interface{}) error {
return Error{Kind: errKind, Err: fmt.Sprintf(format, a...)}
func Errorf(format string, a ...interface{}) error {
return Error{Err: fmt.Sprintf(format, a...)}
}
func (e Error) Error() string {
@ -366,9 +356,9 @@ func LookupCreds(kv KVS) (auth.Credentials, error) {
if err := CheckValidKeys(CredentialsSubSys, kv, DefaultCredentialKVS); err != nil {
return auth.Credentials{}, err
}
accessKey := env.Get(EnvAccessKey, kv.Get(AccessKey))
secretKey := env.Get(EnvSecretKey, kv.Get(SecretKey))
if accessKey == "" && secretKey == "" {
accessKey := kv.Get(AccessKey)
secretKey := kv.Get(SecretKey)
if accessKey == "" || secretKey == "" {
accessKey = auth.DefaultAccessKey
secretKey = auth.DefaultSecretKey
}
@ -390,7 +380,7 @@ func LookupRegion(kv KVS) (string, error) {
if validRegionRegex.MatchString(region) {
return region, nil
}
return "", Errorf(SafeModeKind,
return "", Errorf(
"region '%s' is invalid, expected simple characters such as [us-east-1, myregion...]",
region)
}
@ -414,7 +404,6 @@ func CheckValidKeys(subSys string, kv KVS, validKVS KVS) error {
}
if len(nkv) > 0 {
return Errorf(
ContinueKind,
"found invalid keys (%s) for '%s' sub-system, use 'mc admin config reset myminio %s' to fix invalid keys", nkv.String(), subSys, subSys)
}
return nil
@ -447,15 +436,15 @@ type Targets []Target
// GetKVS - get kvs from specific subsystem.
func (c Config) GetKVS(s string, defaultKVS map[string]KVS) (Targets, error) {
if len(s) == 0 {
return nil, Errorf(SafeModeKind, "input cannot be empty")
return nil, Errorf("input cannot be empty")
}
inputs := strings.Fields(s)
if len(inputs) > 1 {
return nil, Errorf(SafeModeKind, "invalid number of arguments %s", s)
return nil, Errorf("invalid number of arguments %s", s)
}
subSystemValue := strings.SplitN(inputs[0], SubSystemSeparator, 2)
if len(subSystemValue) == 0 {
return nil, Errorf(SafeModeKind, "invalid number of arguments %s", s)
return nil, Errorf("invalid number of arguments %s", s)
}
found := SubSystems.Contains(subSystemValue[0])
if !found {
@ -464,18 +453,18 @@ func (c Config) GetKVS(s string, defaultKVS map[string]KVS) (Targets, error) {
found = !SubSystems.FuncMatch(strings.HasPrefix, subSystemValue[0]).IsEmpty() && len(subSystemValue) == 1
}
if !found {
return nil, Errorf(SafeModeKind, "unknown sub-system %s", s)
return nil, Errorf("unknown sub-system %s", s)
}
targets := Targets{}
subSysPrefix := subSystemValue[0]
if len(subSystemValue) == 2 {
if len(subSystemValue[1]) == 0 {
return nil, Errorf(SafeModeKind, "sub-system target '%s' cannot be empty", s)
return nil, Errorf("sub-system target '%s' cannot be empty", s)
}
kvs, ok := c[subSysPrefix][subSystemValue[1]]
if !ok {
return nil, Errorf(SafeModeKind, "sub-system target '%s' doesn't exist", s)
return nil, Errorf("sub-system target '%s' doesn't exist", s)
}
for _, kv := range defaultKVS[subSysPrefix] {
_, ok = kvs.Lookup(kv.Key)
@ -527,15 +516,15 @@ func (c Config) GetKVS(s string, defaultKVS map[string]KVS) (Targets, error) {
// DelKVS - delete a specific key.
func (c Config) DelKVS(s string) error {
if len(s) == 0 {
return Errorf(SafeModeKind, "input arguments cannot be empty")
return Errorf("input arguments cannot be empty")
}
inputs := strings.Fields(s)
if len(inputs) > 1 {
return Errorf(SafeModeKind, "invalid number of arguments %s", s)
return Errorf("invalid number of arguments %s", s)
}
subSystemValue := strings.SplitN(inputs[0], SubSystemSeparator, 2)
if len(subSystemValue) == 0 {
return Errorf(SafeModeKind, "invalid number of arguments %s", s)
return Errorf("invalid number of arguments %s", s)
}
if !SubSystems.Contains(subSystemValue[0]) {
// Unknown sub-system found try to remove it anyways.
@ -546,13 +535,13 @@ func (c Config) DelKVS(s string) error {
subSys := subSystemValue[0]
if len(subSystemValue) == 2 {
if len(subSystemValue[1]) == 0 {
return Errorf(SafeModeKind, "sub-system target '%s' cannot be empty", s)
return Errorf("sub-system target '%s' cannot be empty", s)
}
tgt = subSystemValue[1]
}
_, ok := c[subSys][tgt]
if !ok {
return Errorf(SafeModeKind, "sub-system %s already deleted", s)
return Errorf("sub-system %s already deleted", s)
}
delete(c[subSys], tgt)
return nil
@ -573,23 +562,23 @@ func (c Config) Clone() Config {
// SetKVS - set specific key values per sub-system.
func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error {
if len(s) == 0 {
return Errorf(SafeModeKind, "input arguments cannot be empty")
return Errorf("input arguments cannot be empty")
}
inputs := strings.SplitN(s, KvSpaceSeparator, 2)
if len(inputs) <= 1 {
return Errorf(SafeModeKind, "invalid number of arguments '%s'", s)
return Errorf("invalid number of arguments '%s'", s)
}
subSystemValue := strings.SplitN(inputs[0], SubSystemSeparator, 2)
if len(subSystemValue) == 0 {
return Errorf(SafeModeKind, "invalid number of arguments %s", s)
return Errorf("invalid number of arguments %s", s)
}
if !SubSystems.Contains(subSystemValue[0]) {
return Errorf(SafeModeKind, "unknown sub-system %s", s)
return Errorf("unknown sub-system %s", s)
}
if SubSystemsSingleTargets.Contains(subSystemValue[0]) && len(subSystemValue) == 2 {
return Errorf(SafeModeKind, "sub-system '%s' only supports single target", subSystemValue[0])
return Errorf("sub-system '%s' only supports single target", subSystemValue[0])
}
var kvs = KVS{}
@ -612,7 +601,7 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error {
kvs.Set(prevK, madmin.SanitizeValue(kv[1]))
continue
}
return Errorf(SafeModeKind, "key '%s', cannot have empty value", kv[0])
return Errorf("key '%s', cannot have empty value", kv[0])
}
tgt := Default
@ -662,7 +651,7 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error {
// Return error only if the
// key is enabled, for state=off
// let it be empty.
return Errorf(SafeModeKind,
return Errorf(
"'%s' is not optional for '%s' sub-system, please check '%s' documentation",
hkv.Key, subSys, subSys)
}

@ -109,8 +109,7 @@ func parseEndpoints(endpoints string) ([]string, bool, error) {
return nil, false, err
}
if etcdSecure && u.Scheme == "http" {
return nil, false, config.Errorf(config.SafeModeKind,
"all endpoints should be https or http: %s", endpoint)
return nil, false, config.Errorf("all endpoints should be https or http: %s", endpoint)
}
// If one of the endpoint is https, we will use https directly.
etcdSecure = etcdSecure || u.Scheme == "https"

@ -283,7 +283,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
// notification targets, based on their target IDs
for _, targetID := range targetIDs {
if !targetList.Exists(targetID) {
return nil, config.Errorf(config.SafeModeKind,
return nil, config.Errorf(
"Unable to disable configured targets '%v'",
targetID)
}
@ -423,7 +423,7 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs
}
kafkaBrokers := env.Get(brokersEnv, kv.Get(target.KafkaBrokers))
if len(kafkaBrokers) == 0 {
return nil, config.Errorf(config.SafeModeKind, "kafka 'brokers' cannot be empty")
return nil, config.Errorf("kafka 'brokers' cannot be empty")
}
for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) {
var host *xnet.Host

@ -38,6 +38,7 @@ import (
func init() {
logger.Init(GOPATH, GOROOT)
logger.RegisterError(config.FmtError)
logger.AddTarget(globalConsoleSys.Console())
}
var (
@ -104,10 +105,6 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
logger.FatalIf(errUnexpected, "Gateway implementation not initialized")
}
// Disable logging until gateway initialization is complete, any
// error during initialization will be shown as a fatal message
logger.Disable = true
// Validate if we have access, secret set through environment.
globalGatewayName = gw.Name()
gatewayName := gw.Name()
@ -161,9 +158,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
srvCfg := newServerConfig()
// Override any values from ENVs.
if err := lookupConfigs(srvCfg); err != nil {
logger.FatalIf(err, "Unable to initialize server config")
}
lookupConfigs(srvCfg)
// hold the mutex lock before a new config is assigned.
globalServerConfigMu.Lock()
@ -228,9 +223,6 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
}
newObject = NewGatewayLayerWithLocker(newObject)
// Re-enable logging
logger.Disable = false
// Once endpoints are finalized, initialize the new object api in safe mode.
globalObjLayerMutex.Lock()
globalSafeMode = true

@ -40,6 +40,7 @@ import (
func init() {
logger.Init(GOPATH, GOROOT)
logger.RegisterError(config.FmtError)
logger.AddTarget(globalConsoleSys.Console())
gob.Register(VerifyFileError(""))
gob.Register(DeleteFileError(""))
}
@ -195,7 +196,7 @@ func newAllSubsystems() {
globalLifecycleSys = NewLifecycleSys()
}
func initSafeModeInit(buckets []BucketInfo) (err error) {
func initSafeMode(buckets []BucketInfo) (err error) {
newObject := newObjectLayerWithoutSafeModeFn()
// Construct path to config/transaction.lock for locking
@ -220,19 +221,6 @@ func initSafeModeInit(buckets []BucketInfo) (err error) {
return
}
var cfgErr config.Error
if errors.As(err, &cfgErr) {
if cfgErr.Kind == config.ContinueKind {
// print the error and continue
logger.Info("Config validation failed '%s' the sub-system is turned-off and all other sub-systems", err)
err = nil
return
}
}
// Enable logger
logger.Disable = false
// Prints the formatted startup message in safe mode operation.
printStartupSafeModeMessage(getAPIEndpoints(), err)
@ -242,10 +230,11 @@ func initSafeModeInit(buckets []BucketInfo) (err error) {
}
}(objLock)
// Calls New() for all sub-systems.
// Calls New() and initializes all sub-systems.
newAllSubsystems()
// Migrate all backend configs to encrypted backend, also handles rotation as well.
// Migrate all backend configs to encrypted backend configs, optionally
// handles rotating keys for encryption.
if err = handleEncryptedConfigBackend(newObject, true); err != nil {
return fmt.Errorf("Unable to handle encrypted backend for config, iam and policies: %w", err)
}
@ -311,10 +300,6 @@ func serverMain(ctx *cli.Context) {
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM)
// Disable logging until server initialization is complete, any
// error during initialization will be shown as a fatal message
logger.Disable = true
// Handle all server command args.
serverHandleCmdArgs(ctx)
@ -405,9 +390,6 @@ func serverMain(ctx *cli.Context) {
logger.Fatal(err, "Unable to initialize backend")
}
// Re-enable logging
logger.Disable = false
// Once endpoints are finalized, initialize the new object api in safe mode.
globalObjLayerMutex.Lock()
globalSafeMode = true
@ -424,7 +406,7 @@ func serverMain(ctx *cli.Context) {
initFederatorBackend(buckets, newObject)
}
logger.FatalIf(initSafeModeInit(buckets), "Unable to initialize server")
logger.FatalIf(initSafeMode(buckets), "Unable to initialize server switching into safe-mode")
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.

Loading…
Cancel
Save