diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 8caa8f5c4..8a11ff0ee 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -176,10 +176,7 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) { // Init global heal state initAllHealState(globalIsXL) - globalNotificationSys, err = NewNotificationSys(globalServerConfig, globalEndpoints) - if err != nil { - return nil, err - } + globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints) // Create new policy system. globalPolicySys = NewPolicySys() diff --git a/cmd/admin-rpc-client.go.orig b/cmd/admin-rpc-client.go.orig deleted file mode 100644 index a7ccdc770..000000000 --- a/cmd/admin-rpc-client.go.orig +++ /dev/null @@ -1,650 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2014, 2015, 2016, 2017, 2018 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "context" - "encoding/json" - "fmt" - "net" - "os" - "path" - "path/filepath" - "sort" - "sync" - "time" - - "github.com/minio/minio-go/pkg/set" - "github.com/minio/minio/cmd/logger" -) - -const ( - // Admin service names - signalServiceRPC = "Admin.SignalService" - reInitFormatRPC = "Admin.ReInitFormat" - listLocksRPC = "Admin.ListLocks" - serverInfoDataRPC = "Admin.ServerInfoData" - getConfigRPC = "Admin.GetConfig" - writeTmpConfigRPC = "Admin.WriteTmpConfig" - commitConfigRPC = "Admin.CommitConfig" -) - -// localAdminClient - represents admin operation to be executed locally. -type localAdminClient struct { -} - -// remoteAdminClient - represents admin operation to be executed -// remotely, via RPC. -type remoteAdminClient struct { - *AuthRPCClient -} - -// adminCmdRunner - abstracts local and remote execution of admin -// commands like service stop and service restart. -type adminCmdRunner interface { - SignalService(s serviceSignal) error - ReInitFormat(dryRun bool) error - ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) - ServerInfoData() (ServerInfoData, error) - GetConfig() ([]byte, error) - WriteTmpConfig(tmpFileName string, configBytes []byte) error - CommitConfig(tmpFileName string) error -} - -var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported") - -// SignalService - sends a restart or stop signal to the local server -func (lc localAdminClient) SignalService(s serviceSignal) error { - switch s { - case serviceRestart, serviceStop: - globalServiceSignalCh <- s - default: - return errUnsupportedSignal - } - return nil -} - -// ReInitFormat - re-initialize disk format. -func (lc localAdminClient) ReInitFormat(dryRun bool) error { - objectAPI := newObjectLayerFn() - if objectAPI == nil { - return errServerNotInitialized - } - return objectAPI.ReloadFormat(context.Background(), dryRun) -} - -// ListLocks - Fetches lock information from local lock instrumentation. -func (lc localAdminClient) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) { - // check if objectLayer is initialized, if not return. - objectAPI := newObjectLayerFn() - if objectAPI == nil { - return nil, errServerNotInitialized - } - return objectAPI.ListLocks(context.Background(), bucket, prefix, duration) -} - -func (rc remoteAdminClient) SignalService(s serviceSignal) (err error) { - switch s { - case serviceRestart, serviceStop: - reply := AuthRPCReply{} - err = rc.Call(signalServiceRPC, &SignalServiceArgs{Sig: s}, - &reply) - default: - err = errUnsupportedSignal - } - return err -} - -// ReInitFormat - re-initialize disk format, remotely. -func (rc remoteAdminClient) ReInitFormat(dryRun bool) error { - reply := AuthRPCReply{} - return rc.Call(reInitFormatRPC, &ReInitFormatArgs{ - DryRun: dryRun, - }, &reply) -} - -// ListLocks - Sends list locks command to remote server via RPC. -func (rc remoteAdminClient) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) { - listArgs := ListLocksQuery{ - Bucket: bucket, - Prefix: prefix, - Duration: duration, - } - var reply ListLocksReply - if err := rc.Call(listLocksRPC, &listArgs, &reply); err != nil { - return nil, err - } - return reply.VolLocks, nil -} - -// ServerInfoData - Returns the server info of this server. -func (lc localAdminClient) ServerInfoData() (sid ServerInfoData, e error) { - if globalBootTime.IsZero() { - return sid, errServerNotInitialized - } - - // Build storage info - objLayer := newObjectLayerFn() - if objLayer == nil { - return sid, errServerNotInitialized - } - storage := objLayer.StorageInfo(context.Background()) - - return ServerInfoData{ - StorageInfo: storage, - ConnStats: globalConnStats.toServerConnStats(), - HTTPStats: globalHTTPStats.toServerHTTPStats(), - Properties: ServerProperties{ - Uptime: UTCNow().Sub(globalBootTime), - Version: Version, - CommitID: CommitID, - SQSARN: globalNotificationSys.GetARNList(), - Region: globalServerConfig.GetRegion(), - }, - }, nil -} - -// ServerInfo - returns the server info of the server to which the RPC call is made. -func (rc remoteAdminClient) ServerInfoData() (sid ServerInfoData, e error) { - args := AuthRPCArgs{} - reply := ServerInfoDataReply{} - err := rc.Call(serverInfoDataRPC, &args, &reply) - if err != nil { - return sid, err - } - - return reply.ServerInfoData, nil -} - -// GetConfig - returns config.json of the local server. -func (lc localAdminClient) GetConfig() ([]byte, error) { - if globalServerConfig == nil { - return nil, fmt.Errorf("config not present") - } - - return json.Marshal(globalServerConfig) -} - -// GetConfig - returns config.json of the remote server. -func (rc remoteAdminClient) GetConfig() ([]byte, error) { - args := AuthRPCArgs{} - reply := ConfigReply{} - if err := rc.Call(getConfigRPC, &args, &reply); err != nil { - return nil, err - } - return reply.Config, nil -} - -// WriteTmpConfig - writes config file content to a temporary file on -// the local server. -func (lc localAdminClient) WriteTmpConfig(tmpFileName string, configBytes []byte) error { - return writeTmpConfigCommon(tmpFileName, configBytes) -} - -// WriteTmpConfig - writes config file content to a temporary file on -// a remote node. -func (rc remoteAdminClient) WriteTmpConfig(tmpFileName string, configBytes []byte) error { - wArgs := WriteConfigArgs{ - TmpFileName: tmpFileName, - Buf: configBytes, - } - - err := rc.Call(writeTmpConfigRPC, &wArgs, &WriteConfigReply{}) - if err != nil { - logger.LogIf(context.Background(), err) - return err - } - - return nil -} - -// CommitConfig - Move the new config in tmpFileName onto config.json -// on a local node. -func (lc localAdminClient) CommitConfig(tmpFileName string) error { - configFile := getConfigFile() - tmpConfigFile := filepath.Join(getConfigDir(), tmpFileName) - - err := os.Rename(tmpConfigFile, configFile) - reqInfo := (&logger.ReqInfo{}).AppendTags("tmpConfigFile", tmpConfigFile) - reqInfo.AppendTags("configFile", configFile) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - return err -} - -// CommitConfig - Move the new config in tmpFileName onto config.json -// on a remote node. -func (rc remoteAdminClient) CommitConfig(tmpFileName string) error { - cArgs := CommitConfigArgs{ - FileName: tmpFileName, - } - cReply := CommitConfigReply{} - err := rc.Call(commitConfigRPC, &cArgs, &cReply) - if err != nil { - logger.LogIf(context.Background(), err) - return err - } - - return nil -} - -// adminPeer - represents an entity that implements admin API RPCs. -type adminPeer struct { - addr string - cmdRunner adminCmdRunner - isLocal bool -} - -// type alias for a collection of adminPeer. -type adminPeers []adminPeer - -// makeAdminPeers - helper function to construct a collection of adminPeer. -func makeAdminPeers(endpoints EndpointList) (adminPeerList adminPeers) { - thisPeer := globalMinioAddr - if globalMinioHost == "" { - // When host is not explicitly provided simply - // use the first IPv4. - thisPeer = net.JoinHostPort(sortIPs(localIP4.ToSlice())[0], globalMinioPort) - } - adminPeerList = append(adminPeerList, adminPeer{ - thisPeer, - localAdminClient{}, - true, - }) - - hostSet := set.CreateStringSet(globalMinioAddr) - cred := globalServerConfig.GetCredential() - serviceEndpoint := path.Join(minioReservedBucketPath, adminPath) - for _, host := range GetRemotePeers(endpoints) { - if hostSet.Contains(host) { - continue - } - hostSet.Add(host) - adminPeerList = append(adminPeerList, adminPeer{ - addr: host, - cmdRunner: &remoteAdminClient{newAuthRPCClient(authConfig{ - accessKey: cred.AccessKey, - secretKey: cred.SecretKey, - serverAddr: host, - serviceEndpoint: serviceEndpoint, - secureConn: globalIsSSL, - serviceName: "Admin", - })}, - }) - } - - return adminPeerList -} - -// peersReInitFormat - reinitialize remote object layers to new format. -func peersReInitFormat(peers adminPeers, dryRun bool) error { - errs := make([]error, len(peers)) - - // Send ReInitFormat RPC call to all nodes. - // for local adminPeer this is a no-op. - wg := sync.WaitGroup{} - for i, peer := range peers { - wg.Add(1) - go func(idx int, peer adminPeer) { - defer wg.Done() - if !peer.isLocal { - errs[idx] = peer.cmdRunner.ReInitFormat(dryRun) - } - }(i, peer) - } - wg.Wait() - return nil -} - -// Initialize global adminPeer collection. -func initGlobalAdminPeers(endpoints EndpointList) { - globalAdminPeers = makeAdminPeers(endpoints) -} - -// invokeServiceCmd - Invoke Restart/Stop command. -func invokeServiceCmd(cp adminPeer, cmd serviceSignal) (err error) { - switch cmd { - case serviceRestart, serviceStop: - err = cp.cmdRunner.SignalService(cmd) - } - return err -} - -// sendServiceCmd - Invoke Restart command on remote peers -// adminPeer followed by on the local peer. -func sendServiceCmd(cps adminPeers, cmd serviceSignal) { - // Send service command like stop or restart to all remote nodes and finally run on local node. - errs := make([]error, len(cps)) - var wg sync.WaitGroup - remotePeers := cps[1:] - for i := range remotePeers { - wg.Add(1) - go func(idx int) { - defer wg.Done() - // we use idx+1 because remotePeers slice is 1 position shifted w.r.t cps - errs[idx+1] = invokeServiceCmd(remotePeers[idx], cmd) - }(i) - } - wg.Wait() - errs[0] = invokeServiceCmd(cps[0], cmd) -} - -// listPeerLocksInfo - fetch list of locks held on the given bucket, -// matching prefix held longer than duration from all peer servers. -func listPeerLocksInfo(peers adminPeers, bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) { - // Used to aggregate volume lock information from all nodes. - allLocks := make([][]VolumeLockInfo, len(peers)) - errs := make([]error, len(peers)) - var wg sync.WaitGroup - localPeer := peers[0] - remotePeers := peers[1:] - for i, remotePeer := range remotePeers { - wg.Add(1) - go func(idx int, remotePeer adminPeer) { - defer wg.Done() - // `remotePeers` is right-shifted by one position relative to `peers` - allLocks[idx], errs[idx] = remotePeer.cmdRunner.ListLocks(bucket, prefix, duration) - }(i+1, remotePeer) - } - wg.Wait() - allLocks[0], errs[0] = localPeer.cmdRunner.ListLocks(bucket, prefix, duration) - - // Summarizing errors received for ListLocks RPC across all - // nodes. N B the possible unavailability of quorum in errors - // applies only to distributed setup. - errCount, err := reduceErrs(errs, []error{}) - if err != nil { - if errCount >= (len(peers)/2 + 1) { - return nil, err - } - return nil, InsufficientReadQuorum{} - } - - // Group lock information across nodes by (bucket, object) - // pair. For readability only. - paramLockMap := make(map[nsParam][]VolumeLockInfo) - for _, nodeLocks := range allLocks { - for _, lockInfo := range nodeLocks { - param := nsParam{ - volume: lockInfo.Bucket, - path: lockInfo.Object, - } - paramLockMap[param] = append(paramLockMap[param], lockInfo) - } - } - groupedLockInfos := []VolumeLockInfo{} - for _, volLocks := range paramLockMap { - groupedLockInfos = append(groupedLockInfos, volLocks...) - } - return groupedLockInfos, nil -} - -// uptimeSlice - used to sort uptimes in chronological order. -type uptimeSlice []struct { - err error - uptime time.Duration -} - -func (ts uptimeSlice) Len() int { - return len(ts) -} - -func (ts uptimeSlice) Less(i, j int) bool { - return ts[i].uptime < ts[j].uptime -} - -func (ts uptimeSlice) Swap(i, j int) { - ts[i], ts[j] = ts[j], ts[i] -} - -// getPeerUptimes - returns the uptime since the last time read quorum -// was established on success. Otherwise returns errXLReadQuorum. -func getPeerUptimes(peers adminPeers) (time.Duration, error) { - // In a single node Erasure or FS backend setup the uptime of - // the setup is the uptime of the single minio server - // instance. - if !globalIsDistXL { - return UTCNow().Sub(globalBootTime), nil - } - - uptimes := make(uptimeSlice, len(peers)) - - // Get up time of all servers. - wg := sync.WaitGroup{} - for i, peer := range peers { - wg.Add(1) - go func(idx int, peer adminPeer) { - defer wg.Done() - serverInfoData, rpcErr := peer.cmdRunner.ServerInfoData() - uptimes[idx].uptime, uptimes[idx].err = serverInfoData.Properties.Uptime, rpcErr - }(i, peer) - } - wg.Wait() - - // Sort uptimes in chronological order. - sort.Sort(uptimes) - - // Pick the readQuorum'th uptime in chronological order. i.e, - // the time at which read quorum was (re-)established. - readQuorum := len(uptimes) / 2 - validCount := 0 - latestUptime := time.Duration(0) - for _, uptime := range uptimes { - if uptime.err != nil { - logger.LogIf(context.Background(), uptime.err) - continue - } - - validCount++ - if validCount >= readQuorum { - latestUptime = uptime.uptime - break - } - } - - // Less than readQuorum "Admin.Uptime" RPC call returned - // successfully, so read-quorum unavailable. - if validCount < readQuorum { - return time.Duration(0), InsufficientReadQuorum{} - } - - return latestUptime, nil -} - -// getPeerConfig - Fetches config.json from all nodes in the setup and -// returns the one that occurs in a majority of them. -func getPeerConfig(peers adminPeers) ([]byte, error) { - if !globalIsDistXL { - return peers[0].cmdRunner.GetConfig() - } - - errs := make([]error, len(peers)) - configs := make([][]byte, len(peers)) - - // Get config from all servers. - wg := sync.WaitGroup{} - for i, peer := range peers { - wg.Add(1) - go func(idx int, peer adminPeer) { - defer wg.Done() - configs[idx], errs[idx] = peer.cmdRunner.GetConfig() - }(i, peer) - } - wg.Wait() - - // Find the maximally occurring config among peers in a - // distributed setup. - - serverConfigs := make([]serverConfig, len(peers)) - for i, configBytes := range configs { - if errs[i] != nil { - continue - } - - // Unmarshal the received config files. - err := json.Unmarshal(configBytes, &serverConfigs[i]) - if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", peers[i].addr) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - return nil, err - } - } - - configJSON, err := getValidServerConfig(serverConfigs, errs) - if err != nil { - logger.LogIf(context.Background(), err) - return nil, err - } - - // Return the config.json that was present quorum or more - // number of disks. - return json.Marshal(configJSON) -} - -// getValidServerConfig - finds the server config that is present in -// quorum or more number of servers. -func getValidServerConfig(serverConfigs []serverConfig, errs []error) (scv serverConfig, e error) { - // majority-based quorum - quorum := len(serverConfigs)/2 + 1 - - // Count the number of disks a config.json was found in. - configCounter := make([]int, len(serverConfigs)) - - // We group equal serverConfigs by the lowest index of the - // same value; e.g, let us take the following serverConfigs - // in a 4-node setup, - // serverConfigs == [c1, c2, c1, c1] - // configCounter == [3, 1, 0, 0] - // c1, c2 are the only distinct values that appear. c1 is - // identified by 0, the lowest index it appears in and c2 is - // identified by 1. So, we need to find the number of times - // each of these distinct values occur. - - // Invariants: - - // 1. At the beginning of the i-th iteration, the number of - // unique configurations seen so far is equal to the number of - // non-zero counter values in config[:i]. - - // 2. At the beginning of the i-th iteration, the sum of - // elements of configCounter[:i] is equal to the number of - // non-error configurations seen so far. - - // For each of the serverConfig ... - for i := range serverConfigs { - // Skip nodes where getConfig failed. - if errs[i] != nil { - continue - } - // Check if it is equal to any of the configurations - // seen so far. If j == i is reached then we have an - // unseen configuration. - for j := 0; j <= i; j++ { - if j < i && configCounter[j] == 0 { - // serverConfigs[j] is known to be - // equal to a value that was already - // seen. See example above for - // clarity. - continue - } else if j < i && serverConfigs[i].ConfigDiff(&serverConfigs[j]) == "" { - // serverConfigs[i] is equal to - // serverConfigs[j], update - // serverConfigs[j]'s counter since it - // is the lower index. - configCounter[j]++ - break - } else if j == i { - // serverConfigs[i] is equal to no - // other value seen before. It is - // unique so far. - configCounter[i] = 1 - break - } // else invariants specified above are violated. - } - } - - // We find the maximally occurring server config and check if - // there is quorum. - var configJSON serverConfig - maxOccurrence := 0 - for i, count := range configCounter { - if maxOccurrence < count { - maxOccurrence = count - configJSON = serverConfigs[i] - } - } - - // If quorum nodes don't agree. - if maxOccurrence < quorum { - return scv, errXLWriteQuorum - } - - return configJSON, nil -} - -// Write config contents into a temporary file on all nodes. -func writeTmpConfigPeers(peers adminPeers, tmpFileName string, configBytes []byte) []error { - // For a single-node minio server setup. - if !globalIsDistXL { - err := peers[0].cmdRunner.WriteTmpConfig(tmpFileName, configBytes) - return []error{err} - } - - errs := make([]error, len(peers)) - - // Write config into temporary file on all nodes. - wg := sync.WaitGroup{} - for i, peer := range peers { - wg.Add(1) - go func(idx int, peer adminPeer) { - defer wg.Done() - errs[idx] = peer.cmdRunner.WriteTmpConfig(tmpFileName, configBytes) - }(i, peer) - } - wg.Wait() - - // Return bytes written and errors (if any) during writing - // temporary config file. - return errs -} - -// Move config contents from the given temporary file onto config.json -// on all nodes. -func commitConfigPeers(peers adminPeers, tmpFileName string) []error { - // For a single-node minio server setup. - if !globalIsDistXL { - return []error{peers[0].cmdRunner.CommitConfig(tmpFileName)} - } - - errs := make([]error, len(peers)) - - // Rename temporary config file into configDir/config.json on - // all nodes. - wg := sync.WaitGroup{} - for i, peer := range peers { - wg.Add(1) - go func(idx int, peer adminPeer) { - defer wg.Done() - errs[idx] = peer.cmdRunner.CommitConfig(tmpFileName) - }(i, peer) - } - wg.Wait() - - // Return errors (if any) received during rename. - return errs -} diff --git a/cmd/admin-rpc_test.go b/cmd/admin-rpc_test.go index d26a17db2..e0f1a8bb8 100644 --- a/cmd/admin-rpc_test.go +++ b/cmd/admin-rpc_test.go @@ -139,10 +139,8 @@ func testAdminCmdRunnerServerInfo(t *testing.T, client adminCmdRunner) { }() endpoints := new(EndpointList) - notificationSys, err := NewNotificationSys(globalServerConfig, *endpoints) - if err != nil { - t.Fatalf("unexpected error %v", err) - } + + notificationSys := NewNotificationSys(globalServerConfig, *endpoints) testCases := []struct { bootTime time.Time diff --git a/cmd/config-current.go b/cmd/config-current.go index 8ebb9b8c6..8628e7373 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -17,6 +17,7 @@ package cmd import ( + "context" "errors" "fmt" "reflect" @@ -402,17 +403,19 @@ func loadConfig() error { // * Add a new target in pkg/event/target package. // * Add newly added target configuration to serverConfig.Notify.. // * Handle the configuration in this function to create/add into TargetList. -func getNotificationTargets(config *serverConfig) (*event.TargetList, error) { +func getNotificationTargets(config *serverConfig) *event.TargetList { targetList := event.NewTargetList() for id, args := range config.Notify.AMQP { if args.Enable { newTarget, err := target.NewAMQPTarget(id, args) if err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } } } @@ -421,10 +424,14 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) { if args.Enable { newTarget, err := target.NewElasticsearchTarget(id, args) if err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue + } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue + } } } @@ -433,10 +440,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) { if args.Enable { newTarget, err := target.NewKafkaTarget(id, args) if err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } } } @@ -445,10 +454,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) { if args.Enable { newTarget, err := target.NewMQTTTarget(id, args) if err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } } } @@ -457,10 +468,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) { if args.Enable { newTarget, err := target.NewMySQLTarget(id, args) if err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } } } @@ -469,10 +482,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) { if args.Enable { newTarget, err := target.NewNATSTarget(id, args) if err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } } } @@ -481,10 +496,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) { if args.Enable { newTarget, err := target.NewPostgreSQLTarget(id, args) if err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } } } @@ -493,10 +510,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) { if args.Enable { newTarget, err := target.NewRedisTarget(id, args) if err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } if err = targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } } } @@ -505,10 +524,11 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) { if args.Enable { newTarget := target.NewWebhookTarget(id, args) if err := targetList.Add(newTarget); err != nil { - return nil, err + logger.LogIf(context.Background(), err) + continue } } } - return targetList, nil + return targetList } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 1f1681444..bf5e9ed00 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -151,16 +151,6 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { // or cause changes on backend format. fs.fsFormatRlk = rlk - // Initialize notification system. - if err = globalNotificationSys.Init(fs); err != nil { - return nil, uiErrUnableToReadFromBackend(err).Msg("Unable to initialize notification system") - } - - // Initialize policy system. - if err = globalPolicySys.Init(fs); err != nil { - return nil, uiErrUnableToReadFromBackend(err).Msg("Unable to initialize policy system") - } - if !fs.diskMount { go fs.diskUsage(globalServiceDoneCh) } diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 0200dd68f..2102eb404 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -170,8 +170,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { initNSLock(false) // Enable local namespace lock. // Create new notification system. - globalNotificationSys, err = NewNotificationSys(globalServerConfig, EndpointList{}) - logger.FatalIf(err, "Unable to create new notification system") + globalNotificationSys = NewNotificationSys(globalServerConfig, EndpointList{}) // Create new policy system. globalPolicySys = NewPolicySys() diff --git a/cmd/notification.go b/cmd/notification.go index d2423fb83..ac1e046ea 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -429,12 +429,8 @@ func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr { } // NewNotificationSys - creates new notification system object. -func NewNotificationSys(config *serverConfig, endpoints EndpointList) (*NotificationSys, error) { - targetList, err := getNotificationTargets(config) - if err != nil { - return nil, err - } - +func NewNotificationSys(config *serverConfig, endpoints EndpointList) *NotificationSys { + targetList := getNotificationTargets(config) peerRPCClientMap := makeRemoteRPCClients(endpoints) // bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init() @@ -443,7 +439,7 @@ func NewNotificationSys(config *serverConfig, endpoints EndpointList) (*Notifica bucketRulesMap: make(map[string]event.RulesMap), bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), peerRPCClientMap: peerRPCClientMap, - }, nil + } } type eventArgs struct { diff --git a/cmd/server-main.go b/cmd/server-main.go index 5092b4651..e6f15315e 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -271,15 +271,6 @@ func serverMain(ctx *cli.Context) { logger.Fatal(uiErrUnexpectedError(err), "Unable to configure one of server's RPC services") } - // Create new notification system. - globalNotificationSys, err = NewNotificationSys(globalServerConfig, globalEndpoints) - if err != nil { - logger.Fatal(err, "Unable to initialize the notification system") - } - - // Create new policy system. - globalPolicySys = NewPolicySys() - // Initialize Admin Peers inter-node communication only in distributed setup. initGlobalAdminPeers(globalEndpoints) @@ -315,6 +306,25 @@ func serverMain(ctx *cli.Context) { initFederatorBackend(newObject) } + // Re-enable logging + logger.Disable = false + + // Create new policy system. + globalPolicySys = NewPolicySys() + + // Initialize policy system. + if err := globalPolicySys.Init(newObjectLayerFn()); err != nil { + logger.Fatal(err, "Unable to initialize policy system") + } + + // Create new notification system. + globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints) + + // Initialize notification system. + if err := globalNotificationSys.Init(newObjectLayerFn()); err != nil { + logger.Fatal(err, "Unable to initialize notification system") + } + // Prints the formatted startup message once object layer is initialized. apiEndpoints := getAPIEndpoints(globalMinioAddr) printStartupMessage(apiEndpoints) @@ -322,9 +332,6 @@ func serverMain(ctx *cli.Context) { // Set uptime time after object layer has initialized. globalBootTime = UTCNow() - // Re-enable logging - logger.Disable = false - handleSignals() } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 48a76757d..e5a26f1ca 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -353,10 +353,8 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer { globalMinioHost = host globalMinioPort = port globalMinioAddr = getEndpointsLocalAddr(testServer.Disks) - globalNotificationSys, err = NewNotificationSys(globalServerConfig, testServer.Disks) - if err != nil { - t.Fatalf("Unable to create new notification system. %v", err) - } + + globalNotificationSys = NewNotificationSys(globalServerConfig, testServer.Disks) // Create new policy system. globalPolicySys = NewPolicySys() @@ -1720,9 +1718,7 @@ func newTestObjectLayer(endpoints EndpointList) (newObject ObjectLayer, err erro } // Create new notification system. - if globalNotificationSys, err = NewNotificationSys(globalServerConfig, endpoints); err != nil { - return nil, err - } + globalNotificationSys = NewNotificationSys(globalServerConfig, endpoints) // Create new policy system. globalPolicySys = NewPolicySys() diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index cab7164bb..cb6f453a8 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -263,16 +263,6 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP // Connect disks right away. s.connectDisks() - // Initialize notification system. - if err := globalNotificationSys.Init(s); err != nil { - return nil, fmt.Errorf("Unable to initialize notification system. %v", err) - } - - // Initialize policy system. - if err := globalPolicySys.Init(s); err != nil { - return nil, fmt.Errorf("Unable to initialize policy system. %v", err) - } - // Start the disk monitoring and connect routine. go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)