You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
401 lines
12 KiB
401 lines
12 KiB
/*
|
|
* Minio Cloud Storage, (C) 2018 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/gorilla/mux"
|
|
"github.com/minio/minio/cmd/logger"
|
|
xrpc "github.com/minio/minio/cmd/rpc"
|
|
"github.com/minio/minio/pkg/event"
|
|
xnet "github.com/minio/minio/pkg/net"
|
|
"github.com/minio/minio/pkg/policy"
|
|
)
|
|
|
|
const peerServiceName = "Peer"
|
|
const peerServiceSubPath = "/peer/remote"
|
|
|
|
var peerServicePath = path.Join(minioReservedBucketPath, peerServiceSubPath)
|
|
|
|
// peerRPCReceiver - Peer RPC receiver for peer RPC server.
|
|
type peerRPCReceiver struct{}
|
|
|
|
// DeleteBucketArgs - delete bucket RPC arguments.
|
|
type DeleteBucketArgs struct {
|
|
AuthArgs
|
|
BucketName string
|
|
}
|
|
|
|
// DeleteBucket - handles delete bucket RPC call which removes all values of given bucket in global NotificationSys object.
|
|
func (receiver *peerRPCReceiver) DeleteBucket(args *DeleteBucketArgs, reply *VoidReply) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
globalNotificationSys.RemoveNotification(args.BucketName)
|
|
globalPolicySys.Remove(args.BucketName)
|
|
return nil
|
|
}
|
|
|
|
// SetBucketPolicyArgs - set bucket policy RPC arguments.
|
|
type SetBucketPolicyArgs struct {
|
|
AuthArgs
|
|
BucketName string
|
|
Policy policy.Policy
|
|
}
|
|
|
|
// SetBucketPolicy - handles set bucket policy RPC call which adds bucket policy to globalPolicySys.
|
|
func (receiver *peerRPCReceiver) SetBucketPolicy(args *SetBucketPolicyArgs, reply *VoidReply) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
globalPolicySys.Set(args.BucketName, args.Policy)
|
|
return nil
|
|
}
|
|
|
|
// RemoveBucketPolicyArgs - delete bucket policy RPC arguments.
|
|
type RemoveBucketPolicyArgs struct {
|
|
AuthArgs
|
|
BucketName string
|
|
}
|
|
|
|
// RemoveBucketPolicy - handles delete bucket policy RPC call which removes bucket policy to globalPolicySys.
|
|
func (receiver *peerRPCReceiver) RemoveBucketPolicy(args *RemoveBucketPolicyArgs, reply *VoidReply) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
globalPolicySys.Remove(args.BucketName)
|
|
return nil
|
|
}
|
|
|
|
// PutBucketNotificationArgs - put bucket notification RPC arguments.
|
|
type PutBucketNotificationArgs struct {
|
|
AuthArgs
|
|
BucketName string
|
|
RulesMap event.RulesMap
|
|
}
|
|
|
|
// PutBucketNotification - handles put bucket notification RPC call which adds rules to given bucket to global NotificationSys object.
|
|
func (receiver *peerRPCReceiver) PutBucketNotification(args *PutBucketNotificationArgs, reply *VoidReply) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
globalNotificationSys.AddRulesMap(args.BucketName, args.RulesMap)
|
|
return nil
|
|
}
|
|
|
|
// ListenBucketNotificationArgs - listen bucket notification RPC arguments.
|
|
type ListenBucketNotificationArgs struct {
|
|
AuthArgs `json:"-"`
|
|
BucketName string `json:"-"`
|
|
EventNames []event.Name `json:"eventNames"`
|
|
Pattern string `json:"pattern"`
|
|
TargetID event.TargetID `json:"targetId"`
|
|
Addr xnet.Host `json:"addr"`
|
|
}
|
|
|
|
// ListenBucketNotification - handles listen bucket notification RPC call.
|
|
// It creates PeerRPCClient target which pushes requested events to target in remote peer.
|
|
func (receiver *peerRPCReceiver) ListenBucketNotification(args *ListenBucketNotificationArgs, reply *VoidReply) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
rpcClient := globalNotificationSys.GetPeerRPCClient(args.Addr)
|
|
if rpcClient == nil {
|
|
return fmt.Errorf("unable to find PeerRPCClient for provided address %v. This happens only if remote and this minio run with different set of endpoints", args.Addr)
|
|
}
|
|
|
|
target := NewPeerRPCClientTarget(args.BucketName, args.TargetID, rpcClient)
|
|
rulesMap := event.NewRulesMap(args.EventNames, args.Pattern, target.ID())
|
|
if err := globalNotificationSys.AddRemoteTarget(args.BucketName, target, rulesMap); err != nil {
|
|
reqInfo := &logger.ReqInfo{BucketName: target.bucketName}
|
|
reqInfo.AppendTags("target", target.id.Name)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RemoteTargetExistArgs - remote target ID exist RPC arguments.
|
|
type RemoteTargetExistArgs struct {
|
|
AuthArgs
|
|
BucketName string
|
|
TargetID event.TargetID
|
|
}
|
|
|
|
// RemoteTargetExist - handles target ID exist RPC call which checks whether given target ID is a HTTP client target or not.
|
|
func (receiver *peerRPCReceiver) RemoteTargetExist(args *RemoteTargetExistArgs, reply *bool) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
*reply = globalNotificationSys.RemoteTargetExist(args.BucketName, args.TargetID)
|
|
return nil
|
|
}
|
|
|
|
// SendEventArgs - send event RPC arguments.
|
|
type SendEventArgs struct {
|
|
AuthArgs
|
|
Event event.Event
|
|
TargetID event.TargetID
|
|
BucketName string
|
|
}
|
|
|
|
// SendEvent - handles send event RPC call which sends given event to target by given target ID.
|
|
func (receiver *peerRPCReceiver) SendEvent(args *SendEventArgs, reply *bool) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
// Set default to true to keep the target.
|
|
*reply = true
|
|
errs := globalNotificationSys.send(args.BucketName, args.Event, args.TargetID)
|
|
|
|
for i := range errs {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("Event", args.Event.EventName.String())
|
|
reqInfo.AppendTags("targetName", args.TargetID.Name)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, errs[i].Err)
|
|
|
|
*reply = false // send failed i.e. do not keep the target.
|
|
return errs[i].Err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReloadFormatArgs - send event RPC arguments.
|
|
type ReloadFormatArgs struct {
|
|
AuthArgs
|
|
DryRun bool
|
|
}
|
|
|
|
// ReloadFormat - handles reload format RPC call, reloads latest `format.json`
|
|
func (receiver *peerRPCReceiver) ReloadFormat(args *ReloadFormatArgs, reply *VoidReply) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
return objAPI.ReloadFormat(context.Background(), args.DryRun)
|
|
}
|
|
|
|
// LoadUsers - handles load users RPC call.
|
|
func (receiver *peerRPCReceiver) LoadUsers(args *AuthArgs, reply *VoidReply) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
return globalIAMSys.Load(objAPI)
|
|
}
|
|
|
|
// LoadCredentials - handles load credentials RPC call.
|
|
func (receiver *peerRPCReceiver) LoadCredentials(args *AuthArgs, reply *VoidReply) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
// Construct path to config.json for the given bucket.
|
|
configFile := path.Join(bucketConfigPrefix, minioConfigFile)
|
|
transactionConfigFile := configFile + ".transaction"
|
|
|
|
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
|
|
// and configFile, take a transaction lock to avoid race.
|
|
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
|
|
if err := objLock.GetRLock(globalOperationTimeout); err != nil {
|
|
return err
|
|
}
|
|
objLock.RUnlock()
|
|
|
|
return globalConfigSys.Load(newObjectLayerFn())
|
|
}
|
|
|
|
// DrivePerfInfo - handles drive performance RPC call
|
|
func (receiver *peerRPCReceiver) DrivePerfInfo(args *AuthArgs, reply *ServerDrivesPerfInfo) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
*reply = localEndpointsDrivePerf(globalEndpoints)
|
|
return nil
|
|
}
|
|
|
|
// CPULoadInfo - handles cpu performance RPC call
|
|
func (receiver *peerRPCReceiver) CPULoadInfo(args *AuthArgs, reply *ServerCPULoadInfo) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
*reply = localEndpointsCPULoad(globalEndpoints)
|
|
return nil
|
|
}
|
|
|
|
// MemUsageInfo - handles mem utilization RPC call
|
|
func (receiver *peerRPCReceiver) MemUsageInfo(args *AuthArgs, reply *ServerMemUsageInfo) error {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
*reply = localEndpointsMemUsage(globalEndpoints)
|
|
return nil
|
|
}
|
|
|
|
// uptimes - used to sort uptimes in chronological order.
|
|
type uptimes []time.Duration
|
|
|
|
func (ts uptimes) Len() int {
|
|
return len(ts)
|
|
}
|
|
|
|
func (ts uptimes) Less(i, j int) bool {
|
|
return ts[i] < ts[j]
|
|
}
|
|
|
|
func (ts uptimes) Swap(i, j int) {
|
|
ts[i], ts[j] = ts[j], ts[i]
|
|
}
|
|
|
|
// getPeerUptimes - returns the uptime.
|
|
func getPeerUptimes(serverInfo []ServerInfo) time.Duration {
|
|
// In a single node Erasure or FS backend setup the uptime of
|
|
// the setup is the uptime of the single minio server
|
|
// instance.
|
|
if !globalIsDistXL {
|
|
return UTCNow().Sub(globalBootTime)
|
|
}
|
|
|
|
var times []time.Duration
|
|
|
|
for _, info := range serverInfo {
|
|
if info.Error != "" {
|
|
continue
|
|
}
|
|
times = append(times, info.Data.Properties.Uptime)
|
|
}
|
|
|
|
// Sort uptimes in chronological order.
|
|
sort.Sort(uptimes(times))
|
|
|
|
// Return the latest time as the uptime.
|
|
return times[0]
|
|
}
|
|
|
|
// StartProfilingArgs - holds the RPC argument for StartingProfiling RPC call
|
|
type StartProfilingArgs struct {
|
|
AuthArgs
|
|
Profiler string
|
|
}
|
|
|
|
// StartProfiling - profiling server receiver.
|
|
func (receiver *peerRPCReceiver) StartProfiling(args *StartProfilingArgs, reply *VoidReply) error {
|
|
if globalProfiler != nil {
|
|
globalProfiler.Stop()
|
|
}
|
|
var err error
|
|
globalProfiler, err = startProfiler(args.Profiler, "")
|
|
return err
|
|
}
|
|
|
|
// DownloadProfilingData - download profiling data.
|
|
func (receiver *peerRPCReceiver) DownloadProfilingData(args *AuthArgs, reply *[]byte) error {
|
|
var err error
|
|
*reply, err = getProfileData()
|
|
return err
|
|
}
|
|
|
|
var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported")
|
|
|
|
// SignalServiceArgs - send event RPC arguments.
|
|
type SignalServiceArgs struct {
|
|
AuthArgs
|
|
Sig serviceSignal
|
|
}
|
|
|
|
// SignalService - signal service receiver.
|
|
func (receiver *peerRPCReceiver) SignalService(args *SignalServiceArgs, reply *VoidReply) error {
|
|
switch args.Sig {
|
|
case serviceRestart, serviceStop:
|
|
globalServiceSignalCh <- args.Sig
|
|
default:
|
|
return errUnsupportedSignal
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ServerInfo - server info receiver.
|
|
func (receiver *peerRPCReceiver) ServerInfo(args *AuthArgs, reply *ServerInfoData) error {
|
|
if globalBootTime.IsZero() {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
// Build storage info
|
|
objLayer := newObjectLayerFn()
|
|
if objLayer == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
// Server info data.
|
|
*reply = ServerInfoData{
|
|
StorageInfo: objLayer.StorageInfo(context.Background()),
|
|
ConnStats: globalConnStats.toServerConnStats(),
|
|
HTTPStats: globalHTTPStats.toServerHTTPStats(),
|
|
Properties: ServerProperties{
|
|
Uptime: UTCNow().Sub(globalBootTime),
|
|
Version: Version,
|
|
CommitID: CommitID,
|
|
SQSARN: globalNotificationSys.GetARNList(),
|
|
Region: globalServerConfig.GetRegion(),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewPeerRPCServer - returns new peer RPC server.
|
|
func NewPeerRPCServer() (*xrpc.Server, error) {
|
|
rpcServer := xrpc.NewServer()
|
|
if err := rpcServer.RegisterName(peerServiceName, &peerRPCReceiver{}); err != nil {
|
|
return nil, err
|
|
}
|
|
return rpcServer, nil
|
|
}
|
|
|
|
// registerPeerRPCRouter - creates and registers Peer RPC server and its router.
|
|
func registerPeerRPCRouter(router *mux.Router) {
|
|
rpcServer, err := NewPeerRPCServer()
|
|
logger.FatalIf(err, "Unable to initialize peer RPC Server")
|
|
subrouter := router.PathPrefix(minioReservedBucketPath).Subrouter()
|
|
subrouter.Path(peerServiceSubPath).HandlerFunc(httpTraceHdrs(rpcServer.ServeHTTP))
|
|
}
|
|
|