From a8dd7b3eda4e665dbf434815bd832515f883bfc6 Mon Sep 17 00:00:00 2001 From: poornas Date: Thu, 30 Jul 2020 19:55:22 -0700 Subject: [PATCH] Refactor replication target management. (#10154) Generalize replication target management so that remote targets for a bucket can be managed with ARNs. `mc admin bucket remote` command will be used to manage targets. --- cmd/admin-bucket-handlers.go | 120 +++++++++---- cmd/admin-router.go | 10 +- cmd/api-errors.go | 44 ++++- cmd/bucket-handlers.go | 2 +- cmd/bucket-metadata-sys.go | 6 +- cmd/bucket-metadata.go | 6 +- cmd/bucket-replication.go | 178 ++------------------ cmd/bucket-targets.go | 242 +++++++++++++++++++++++++++ cmd/globals.go | 6 +- cmd/object-api-errors.go | 36 +++- cmd/object-handlers.go | 12 +- cmd/server-main.go | 9 +- docs/bucket/replication/README.md | 28 +++- pkg/madmin/examples/bucket-target.go | 15 +- pkg/madmin/remote-target-commands.go | 151 +++++++++++------ 15 files changed, 577 insertions(+), 288 deletions(-) create mode 100644 cmd/bucket-targets.go diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index b3b0b93b0..660c8a389 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -20,6 +20,7 @@ import ( "encoding/json" "io" "io/ioutil" + "net" "net/http" "github.com/gorilla/mux" @@ -139,11 +140,6 @@ func (a adminAPIHandlers) SetBucketTargetHandler(w http.ResponseWriter, r *http. writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) return } - // Turn off replication if disk crawl is unavailable. - if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketReplicationDisabledError), r.URL) - return - } // Check if bucket exists. if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { @@ -156,7 +152,6 @@ func (a adminAPIHandlers) SetBucketTargetHandler(w http.ResponseWriter, r *http. writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) return } - password := cred.SecretKey reqBytes, err := madmin.DecryptData(password, io.LimitReader(r.Body, r.ContentLength)) @@ -169,32 +164,61 @@ func (a adminAPIHandlers) SetBucketTargetHandler(w http.ResponseWriter, r *http. writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) return } - target.Arn = globalBucketReplicationSys.getReplicationARN(target.URL()) - tgtBytes, err := json.Marshal(&target) + host, port, err := net.SplitHostPort(target.Endpoint) if err != nil { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) return } - if err = globalBucketMetadataSys.Update(bucket, bucketTargetsFile, tgtBytes); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + sameTarget, _ := isLocalHost(host, port, globalMinioPort) + if sameTarget && bucket == target.TargetBucket { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketRemoteIdenticalToSource), r.URL) + return + } + target.Arn = globalBucketTargetSys.getRemoteARN(bucket, &target) + if target.Arn == "" { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) return } - if err = globalBucketReplicationSys.SetTarget(ctx, bucket, &target); err != nil { + if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target); err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } + targets, err := globalBucketTargetSys.ListTargets(ctx, bucket) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + tgtBytes, err := json.Marshal(&targets) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) + return + } + + if err = globalBucketMetadataSys.Update(bucket, bucketTargetsFile, tgtBytes); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + data, err := json.Marshal(target.Arn) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } // Write success response. - writeSuccessResponseHeadersOnly(w) + writeSuccessResponseJSON(w, data) } -// GetBucketTargetHandler - gets remote target for a particular bucket -func (a adminAPIHandlers) GetBucketTargetsHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "GetBucketTarget") +// ListBucketTargetsHandler - lists remote target(s) for a bucket or gets a target +// for a particular ARN type +func (a adminAPIHandlers) ListBucketTargetsHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ListBucketTargets") - defer logger.AuditLog(w, r, "GetBucketTarget", mustGetClaimsFromToken(r)) + defer logger.AuditLog(w, r, "ListBucketTargets", mustGetClaimsFromToken(r)) vars := mux.Vars(r) bucket := vars["bucket"] + arnType := vars["type"] + // Get current object layer instance. objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.GetBucketTargetAction) if objectAPI == nil { @@ -202,20 +226,28 @@ func (a adminAPIHandlers) GetBucketTargetsHandler(w http.ResponseWriter, r *http return } - target, err := globalBucketMetadataSys.GetReplicationTargetConfig(bucket) + cfg, err := globalBucketMetadataSys.GetBucketTargetsConfig(bucket) if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } - - // remove secretKey from creds - var tgt madmin.BucketTarget - if !target.Empty() { - var creds auth.Credentials - creds.AccessKey = target.Credentials.AccessKey - tgt = madmin.BucketTarget{Endpoint: target.Endpoint, TargetBucket: target.TargetBucket, Credentials: &creds, Arn: target.Arn} + var ( + targets []madmin.BucketTarget + tgt, ct madmin.BucketTarget + creds auth.Credentials + ) + if cfg != nil && !cfg.Empty() { + for idx, t := range cfg.Targets { + if string(t.Type) == arnType || arnType == "" { + ct = cfg.Targets[idx] + // remove secretKey from creds + creds.AccessKey = ct.Credentials.AccessKey + tgt = madmin.BucketTarget{Endpoint: ct.Endpoint, Secure: ct.Secure, TargetBucket: ct.TargetBucket, Credentials: &creds, Arn: ct.Arn, Type: ct.Type} + targets = append(targets, tgt) + } + } } - data, err := json.Marshal(tgt) + data, err := json.Marshal(targets) if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return @@ -224,24 +256,44 @@ func (a adminAPIHandlers) GetBucketTargetsHandler(w http.ResponseWriter, r *http writeSuccessResponseJSON(w, data) } -// GetBucketTargetARNHandler - gets replication ARN for a particular remote -func (a adminAPIHandlers) GetBucketTargetARNHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "GetBucketTargetARN") +// RemoveBucketTargetHandler - removes a remote target for bucket with specified ARN +func (a adminAPIHandlers) RemoveBucketTargetHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "RemoveBucketTarget") - defer logger.AuditLog(w, r, "GetBucketTargetARN", mustGetClaimsFromToken(r)) + defer logger.AuditLog(w, r, "RemoveBucketTarget", mustGetClaimsFromToken(r)) vars := mux.Vars(r) - rURL := vars["url"] + bucket := vars["bucket"] + arn := vars["arn"] + // Get current object layer instance. - objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.GetBucketTargetAction) + objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.SetBucketTargetAction) if objectAPI == nil { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) return } - data, err := json.Marshal(globalBucketReplicationSys.getARN(rURL)) + if !globalIsErasure { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + if err := globalBucketTargetSys.RemoveTarget(ctx, bucket, arn); err != nil { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + targets, err := globalBucketTargetSys.ListTargets(ctx, bucket) if err != nil { - writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + tgtBytes, err := json.Marshal(&targets) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) return } + if err = globalBucketMetadataSys.Update(bucket, bucketTargetsFile, tgtBytes); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + // Write success response. - writeSuccessResponseJSON(w, data) + writeSuccessNoContent(w) } diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 16e060823..dd339b844 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -183,14 +183,14 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) } // Bucket replication operations // GetBucketTargetHandler - adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-bucket-target").HandlerFunc( - httpTraceHdrs(adminAPI.GetBucketTargetsHandler)).Queries("bucket", "{bucket:.*}") - // GetBucketTargetARN Handler - adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-bucket-target-arn").HandlerFunc( - httpTraceHdrs(adminAPI.GetBucketTargetARNHandler)).Queries("url", "{url:.*}") + adminRouter.Methods(http.MethodGet).Path(adminVersion+"/list-bucket-targets").HandlerFunc( + httpTraceHdrs(adminAPI.ListBucketTargetsHandler)).Queries("bucket", "{bucket:.*}", "type", "{type:.*}") // SetBucketTargetHandler adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-bucket-target").HandlerFunc( httpTraceHdrs(adminAPI.SetBucketTargetHandler)).Queries("bucket", "{bucket:.*}") + // SetBucketTargetHandler + adminRouter.Methods(http.MethodDelete).Path(adminVersion+"/remove-bucket-target").HandlerFunc( + httpTraceHdrs(adminAPI.RemoveBucketTargetHandler)).Queries("bucket", "{bucket:.*}", "arn", "{arn:.*}") } // -- Top APIs -- diff --git a/cmd/api-errors.go b/cmd/api-errors.go index aea59f8f0..4e2fa107a 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -108,6 +108,11 @@ const ( ErrReplicationConfigurationNotFoundError ErrReplicationDestinationNotFoundError ErrReplicationTargetNotFoundError + ErrBucketRemoteIdenticalToSource + ErrBucketRemoteAlreadyExists + ErrBucketRemoteArnTypeInvalid + ErrBucketRemoteArnInvalid + ErrBucketRemoteRemoveDisallowed ErrReplicationTargetNotVersionedError ErrReplicationNeedsVersioningError ErrReplicationBucketNeedsVersioningError @@ -826,14 +831,39 @@ var errorCodes = errorCodeMap{ HTTPStatusCode: http.StatusNotFound, }, ErrReplicationTargetNotFoundError: { - Code: "ReplicationTargetNotFoundError", + Code: "XminioAdminReplicationTargetNotFoundError", Description: "The replication target does not exist", HTTPStatusCode: http.StatusNotFound, }, + ErrBucketRemoteIdenticalToSource: { + Code: "XminioAdminRemoteIdenticalToSource", + Description: "The remote target cannot be identical to source", + HTTPStatusCode: http.StatusBadRequest, + }, + ErrBucketRemoteAlreadyExists: { + Code: "XminioAdminBucketRemoteAlreadyExists", + Description: "The remote target already exists", + HTTPStatusCode: http.StatusBadRequest, + }, + ErrBucketRemoteRemoveDisallowed: { + Code: "XMinioAdminRemoteRemoveDisallowed", + Description: "Replication configuration exists with this ARN.", + HTTPStatusCode: http.StatusBadRequest, + }, + ErrBucketRemoteArnTypeInvalid: { + Code: "XMinioAdminRemoteARNTypeInvalid", + Description: "The bucket remote ARN type is not valid", + HTTPStatusCode: http.StatusBadRequest, + }, + ErrBucketRemoteArnInvalid: { + Code: "XMinioAdminRemoteArnInvalid", + Description: "The bucket remote ARN does not have correct format", + HTTPStatusCode: http.StatusBadRequest, + }, ErrReplicationTargetNotVersionedError: { Code: "ReplicationTargetNotVersionedError", Description: "The replication target does not have versioning enabled", - HTTPStatusCode: http.StatusNotFound, + HTTPStatusCode: http.StatusBadRequest, }, ErrReplicationNeedsVersioningError: { Code: "InvalidRequest", @@ -1879,8 +1909,16 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) { apiErr = ErrReplicationConfigurationNotFoundError case BucketReplicationDestinationNotFound: apiErr = ErrReplicationDestinationNotFoundError - case BucketReplicationTargetNotFound: + case BucketRemoteTargetNotFound: apiErr = ErrReplicationTargetNotFoundError + case BucketRemoteAlreadyExists: + apiErr = ErrBucketRemoteAlreadyExists + case BucketRemoteArnTypeInvalid: + apiErr = ErrBucketRemoteArnTypeInvalid + case BucketRemoteArnInvalid: + apiErr = ErrBucketRemoteArnInvalid + case BucketRemoteRemoveDisallowed: + apiErr = ErrBucketRemoteRemoveDisallowed case BucketReplicationTargetNotVersioned: apiErr = ErrReplicationTargetNotVersionedError case BucketQuotaExceeded: diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 57e9fc59f..9934591ce 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -1271,7 +1271,7 @@ func (api objectAPIHandlers) PutBucketReplicationConfigHandler(w http.ResponseWr writeErrorResponse(ctx, w, apiErr, r.URL, guessIsBrowserReq(r)) return } - sameTarget, err := globalBucketReplicationSys.validateDestination(ctx, bucket, replicationConfig) + sameTarget, err := validateReplicationDestination(ctx, bucket, replicationConfig) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index 6b871a619..544429882 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -340,15 +340,15 @@ func (sys *BucketMetadataSys) GetReplicationConfig(ctx context.Context, bucket s return meta.replicationConfig, nil } -// GetReplicationTargetConfig returns configured bucket replication target for this bucket +// GetBucketTargetsConfig returns configured bucket targets for this bucket // The returned object may not be modified. -func (sys *BucketMetadataSys) GetReplicationTargetConfig(bucket string) (*madmin.BucketTarget, error) { +func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.BucketTargets, error) { meta, err := sys.GetConfig(bucket) if err != nil { return nil, err } if meta.bucketTargetConfig == nil { - return nil, BucketReplicationTargetNotFound{Bucket: bucket} + return nil, BucketRemoteTargetNotFound{Bucket: bucket} } return meta.bucketTargetConfig, nil } diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index faa454e72..cd0b319e0 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -85,7 +85,7 @@ type BucketMetadata struct { taggingConfig *tags.Tags quotaConfig *madmin.BucketQuota replicationConfig *replication.Config - bucketTargetConfig *madmin.BucketTarget + bucketTargetConfig *madmin.BucketTargets } // newBucketMetadata creates BucketMetadata with the supplied name and Created to Now. @@ -100,7 +100,7 @@ func newBucketMetadata(name string) BucketMetadata { versioningConfig: &versioning.Versioning{ XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/", }, - bucketTargetConfig: &madmin.BucketTarget{}, + bucketTargetConfig: &madmin.BucketTargets{}, } } @@ -232,7 +232,7 @@ func (b *BucketMetadata) parseAllConfigs(ctx context.Context, objectAPI ObjectLa return err } } else { - b.bucketTargetConfig = &madmin.BucketTarget{} + b.bucketTargetConfig = &madmin.BucketTargets{} } return nil } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 73cea798b..6f770c26e 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -18,35 +18,22 @@ package cmd import ( "context" - "fmt" "net/http" - "strings" - "sync" "time" miniogo "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bucket/replication" - "github.com/minio/minio/pkg/bucket/versioning" "github.com/minio/minio/pkg/event" iampolicy "github.com/minio/minio/pkg/iam/policy" - "github.com/minio/minio/pkg/madmin" ) -// BucketReplicationSys represents replication subsystem -type BucketReplicationSys struct { - sync.RWMutex - targetsMap map[string]*miniogo.Core - targetsARNMap map[string]string -} - -// GetConfig - gets replication config associated to a given bucket name. -func (sys *BucketReplicationSys) GetConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) { +// gets replication config associated to a given bucket name. +func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) { if globalIsGateway { objAPI := newObjectLayerWithoutSafeModeFn() if objAPI == nil { @@ -59,153 +46,29 @@ func (sys *BucketReplicationSys) GetConfig(ctx context.Context, bucketName strin return globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName) } -// SetTarget - sets a new minio-go client replication target for this bucket. -func (sys *BucketReplicationSys) SetTarget(ctx context.Context, bucket string, tgt *madmin.BucketTarget) error { - if globalIsGateway { - return nil - } - // delete replication targets that were removed - if tgt.Empty() { - sys.Lock() - if currTgt, ok := sys.targetsMap[bucket]; ok { - delete(sys.targetsARNMap, currTgt.EndpointURL().String()) - } - delete(sys.targetsMap, bucket) - sys.Unlock() - return nil - } - clnt, err := getReplicationTargetClient(tgt) - if err != nil { - return BucketReplicationTargetNotFound{Bucket: tgt.TargetBucket} - } - ok, err := clnt.BucketExists(ctx, tgt.TargetBucket) - if err != nil { - return err - } - if !ok { - return BucketReplicationDestinationNotFound{Bucket: tgt.TargetBucket} - } - vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket) - if err != nil || vcfg.Status != string(versioning.Enabled) { - return BucketReplicationTargetNotVersioned{Bucket: tgt.TargetBucket} - } - sys.Lock() - sys.targetsMap[bucket] = clnt - sys.targetsARNMap[tgt.URL()] = tgt.Arn - sys.Unlock() - return nil -} - -// GetTargetClient returns minio-go client for target instance -func (sys *BucketReplicationSys) GetTargetClient(ctx context.Context, bucket string) *miniogo.Core { - var clnt *miniogo.Core - sys.RLock() - if c, ok := sys.targetsMap[bucket]; ok { - clnt = c - } - sys.RUnlock() - return clnt -} - -// validateDestination returns error if replication destination bucket missing or not configured +// validateReplicationDestination returns error if replication destination bucket missing or not configured // It also returns true if replication destination is same as this server. -func (sys *BucketReplicationSys) validateDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, error) { - clnt := sys.GetTargetClient(ctx, bucket) +func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, error) { + clnt := globalBucketTargetSys.GetReplicationTargetClient(ctx, rCfg.ReplicationArn) if clnt == nil { - return false, BucketReplicationTargetNotFound{Bucket: bucket} + return false, BucketRemoteTargetNotFound{Bucket: bucket} } if found, _ := clnt.BucketExists(ctx, rCfg.GetDestination().Bucket); !found { return false, BucketReplicationDestinationNotFound{Bucket: rCfg.GetDestination().Bucket} } // validate replication ARN against target endpoint - for k, v := range sys.targetsARNMap { - if v == rCfg.ReplicationArn { - if k == clnt.EndpointURL().String() { - sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort) - return sameTarget, nil - } - } - } - return false, BucketReplicationTargetNotFound{Bucket: bucket} -} - -// NewBucketReplicationSys - creates new replication system. -func NewBucketReplicationSys() *BucketReplicationSys { - return &BucketReplicationSys{ - targetsMap: make(map[string]*miniogo.Core), - targetsARNMap: make(map[string]string), - } -} - -// Init initializes the bucket replication subsystem for buckets with replication config -func (sys *BucketReplicationSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { - if objAPI == nil { - return errServerNotInitialized - } - - // In gateway mode, replication is not supported. - if globalIsGateway { - return nil - } - - // Load bucket replication targets once during boot. - sys.load(ctx, buckets, objAPI) - return nil -} - -// create minio-go clients for buckets having replication targets -func (sys *BucketReplicationSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) { - for _, bucket := range buckets { - tgt, err := globalBucketMetadataSys.GetReplicationTargetConfig(bucket.Name) - if err != nil { - continue - } - if tgt == nil || tgt.Empty() { - continue - } - tgtClient, err := getReplicationTargetClient(tgt) - if err != nil { - continue + c, ok := globalBucketTargetSys.arnRemotesMap[rCfg.ReplicationArn] + if ok { + if c.EndpointURL().String() == clnt.EndpointURL().String() { + sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort) + return sameTarget, nil } - sys.Lock() - sys.targetsMap[bucket.Name] = tgtClient - sys.targetsARNMap[tgt.URL()] = tgt.Arn - sys.Unlock() - } -} - -// GetARN returns the ARN associated with replication target URL -func (sys *BucketReplicationSys) getARN(endpoint string) string { - return sys.targetsARNMap[endpoint] -} - -// getReplicationTargetInstanceTransport contains a singleton roundtripper. -var getReplicationTargetInstanceTransport http.RoundTripper -var getReplicationTargetInstanceTransportOnce sync.Once - -// Returns a minio-go Client configured to access remote host described in replication target config. -var getReplicationTargetClient = func(tcfg *madmin.BucketTarget) (*miniogo.Core, error) { - config := tcfg.Credentials - // if Signature version '4' use NewV4 directly. - creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, "") - // if Signature version '2' use NewV2 directly. - if strings.ToUpper(tcfg.API) == "S3V2" { - creds = credentials.NewStaticV2(config.AccessKey, config.SecretKey, "") } - - getReplicationTargetInstanceTransportOnce.Do(func() { - getReplicationTargetInstanceTransport = NewGatewayHTTPTransport() - }) - core, err := miniogo.NewCore(tcfg.Endpoint, &miniogo.Options{ - Creds: creds, - Secure: tcfg.Secure, - Transport: getReplicationTargetInstanceTransport, - }) - return core, err + return false, BucketRemoteTargetNotFound{Bucket: bucket} } // mustReplicate returns true if object meets replication criteria. -func (sys *BucketReplicationSys) mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool { +func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool { if globalIsGateway { return false } @@ -218,7 +81,7 @@ func (sys *BucketReplicationSys) mustReplicate(ctx context.Context, r *http.Requ if s3Err := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone { return false } - cfg, err := globalBucketReplicationSys.GetConfig(ctx, bucket) + cfg, err := getReplicationConfig(ctx, bucket) if err != nil { return false } @@ -279,12 +142,12 @@ func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOp // replicateObject replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. func replicateObject(ctx context.Context, bucket, object, versionID string, objectAPI ObjectLayer, eventArg *eventArgs, healPending bool) { - cfg, err := globalBucketReplicationSys.GetConfig(ctx, bucket) + cfg, err := getReplicationConfig(ctx, bucket) if err != nil { logger.LogIf(ctx, err) return } - tgt := globalBucketReplicationSys.GetTargetClient(ctx, bucket) + tgt := globalBucketTargetSys.GetReplicationTargetClient(ctx, cfg.ReplicationArn) if tgt == nil { return } @@ -344,12 +207,3 @@ func replicateObject(ctx context.Context, bucket, object, versionID string, obje logger.LogIf(ctx, err) } } - -// getReplicationARN gets existing ARN for an endpoint or generates a new one. -func (sys *BucketReplicationSys) getReplicationARN(endpoint string) string { - arn, ok := sys.targetsARNMap[endpoint] - if ok { - return arn - } - return fmt.Sprintf("arn:minio:s3::%s:*", mustGetUUID()) -} diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go new file mode 100644 index 000000000..5d60a3006 --- /dev/null +++ b/cmd/bucket-targets.go @@ -0,0 +1,242 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "fmt" + "net/http" + "sync" + + miniogo "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/minio/minio/pkg/bucket/versioning" + "github.com/minio/minio/pkg/madmin" +) + +// BucketTargetSys represents bucket targets subsystem +type BucketTargetSys struct { + sync.RWMutex + arnRemotesMap map[string]*miniogo.Core + targetsMap map[string][]madmin.BucketTarget + clientsCache map[string]*miniogo.Core +} + +// ListTargets - gets list of bucket targets for this bucket. +func (sys *BucketTargetSys) ListTargets(ctx context.Context, bucket string) (*madmin.BucketTargets, error) { + if globalIsGateway { + return nil, nil + } + tgts, ok := sys.targetsMap[bucket] + if ok { + return &madmin.BucketTargets{Targets: tgts}, nil + } + return nil, fmt.Errorf("No remote targets exist for bucket %s", bucket) +} + +// SetTarget - sets a new minio-go client target for this bucket. +func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *madmin.BucketTarget) error { + if globalIsGateway { + return nil + } + if !tgt.Type.IsValid() { + return BucketRemoteArnTypeInvalid{Bucket: bucket} + } + clnt, err := sys.getRemoteTargetClient(tgt) + if err != nil { + return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket} + } + + if tgt.Type == madmin.ReplicationArn { + vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket) + if err != nil || vcfg.Status != string(versioning.Enabled) { + if isErrBucketNotFound(err) { + return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket} + } + return BucketReplicationTargetNotVersioned{Bucket: tgt.TargetBucket} + } + } + + sys.Lock() + defer sys.Unlock() + + tgts := sys.targetsMap[bucket] + newtgts := make([]madmin.BucketTarget, len(tgts)) + found := false + for idx, t := range tgts { + if t.Type == tgt.Type { + if t.Arn == tgt.Arn { + return BucketRemoteAlreadyExists{Bucket: t.TargetBucket} + } + newtgts[idx] = *tgt + found = true + continue + } + newtgts[idx] = t + } + if !found { + newtgts = append(newtgts, *tgt) + } + + sys.targetsMap[bucket] = newtgts + sys.arnRemotesMap[tgt.Arn] = clnt + if _, ok := sys.clientsCache[clnt.EndpointURL().String()]; !ok { + sys.clientsCache[clnt.EndpointURL().String()] = clnt + } + return nil +} + +// RemoveTarget - removes a remote bucket target for this source bucket. +func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr string) error { + if globalIsGateway { + return nil + } + if arnStr == "" { + return BucketRemoteArnInvalid{Bucket: bucket} + } + arn, err := madmin.ParseARN(arnStr) + if err != nil { + return BucketRemoteArnInvalid{Bucket: bucket} + } + if arn.Type == madmin.ReplicationArn { + // reject removal of remote target if replication configuration is present + rcfg, err := getReplicationConfig(ctx, bucket) + if err == nil && rcfg.ReplicationArn == arnStr { + if _, ok := sys.arnRemotesMap[arnStr]; ok { + return BucketRemoteRemoveDisallowed{Bucket: bucket} + } + } + } + // delete ARN type from list of matching targets + sys.Lock() + defer sys.Unlock() + targets := make([]madmin.BucketTarget, 0) + tgts := sys.targetsMap[bucket] + for _, tgt := range tgts { + if tgt.Arn != arnStr { + targets = append(targets, tgt) + } + } + sys.targetsMap[bucket] = targets + delete(sys.arnRemotesMap, arnStr) + return nil +} + +// GetReplicationTargetClient returns minio-go client for replication target instance +func (sys *BucketTargetSys) GetReplicationTargetClient(ctx context.Context, arn string) *miniogo.Core { + sys.RLock() + defer sys.RUnlock() + return sys.arnRemotesMap[arn] +} + +// NewBucketTargetSys - creates new replication system. +func NewBucketTargetSys() *BucketTargetSys { + return &BucketTargetSys{ + arnRemotesMap: make(map[string]*miniogo.Core), + targetsMap: make(map[string][]madmin.BucketTarget), + clientsCache: make(map[string]*miniogo.Core), + } +} + +// Init initializes the bucket targets subsystem for buckets which have targets configured. +func (sys *BucketTargetSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { + if objAPI == nil { + return errServerNotInitialized + } + + // In gateway mode, bucket targets is not supported. + if globalIsGateway { + return nil + } + + // Load bucket targets once during boot. + sys.load(ctx, buckets, objAPI) + return nil +} + +// create minio-go clients for buckets having remote targets +func (sys *BucketTargetSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) { + for _, bucket := range buckets { + cfg, err := globalBucketMetadataSys.GetBucketTargetsConfig(bucket.Name) + if err != nil { + continue + } + if cfg == nil || cfg.Empty() { + continue + } + if len(cfg.Targets) > 0 { + sys.targetsMap[bucket.Name] = cfg.Targets + } + for _, tgt := range cfg.Targets { + tgtClient, err := sys.getRemoteTargetClient(&tgt) + if err != nil { + continue + } + sys.arnRemotesMap[tgt.Arn] = tgtClient + if _, ok := sys.clientsCache[tgtClient.EndpointURL().String()]; !ok { + sys.clientsCache[tgtClient.EndpointURL().String()] = tgtClient + } + } + sys.targetsMap[bucket.Name] = cfg.Targets + } +} + +// getRemoteTargetInstanceTransport contains a singleton roundtripper. +var getRemoteTargetInstanceTransport http.RoundTripper +var getRemoteTargetInstanceTransportOnce sync.Once + +// Returns a minio-go Client configured to access remote host described in replication target config. +func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*miniogo.Core, error) { + if clnt, ok := sys.clientsCache[tcfg.Endpoint]; ok { + return clnt, nil + } + config := tcfg.Credentials + creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, "") + + getRemoteTargetInstanceTransportOnce.Do(func() { + getRemoteTargetInstanceTransport = NewGatewayHTTPTransport() + }) + core, err := miniogo.NewCore(tcfg.Endpoint, &miniogo.Options{ + Creds: creds, + Secure: tcfg.Secure, + Transport: getRemoteTargetInstanceTransport, + }) + return core, err +} + +// getRemoteARN gets existing ARN for an endpoint or generates a new one. +func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTarget) string { + if target == nil { + return "" + } + tgts := sys.targetsMap[bucket] + for _, tgt := range tgts { + if tgt.Type == target.Type && tgt.TargetBucket == target.TargetBucket && target.URL() == tgt.URL() { + return tgt.Arn + } + } + if !madmin.ArnType(target.Type).IsValid() { + return "" + } + arn := madmin.ARN{ + Type: target.Type, + ID: mustGetUUID(), + Region: target.Region, + Bucket: target.TargetBucket, + } + return arn.String() +} diff --git a/cmd/globals.go b/cmd/globals.go index 7245e224c..7358611de 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -154,9 +154,9 @@ var ( globalPolicySys *PolicySys globalIAMSys *IAMSys - globalLifecycleSys *LifecycleSys - globalBucketSSEConfigSys *BucketSSEConfigSys - globalBucketReplicationSys *BucketReplicationSys + globalLifecycleSys *LifecycleSys + globalBucketSSEConfigSys *BucketSSEConfigSys + globalBucketTargetSys *BucketTargetSys // globalAPIConfig controls S3 API requests throttling, // healthcheck readiness deadlines and cors settings. globalAPIConfig apiConfig diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index 3afe2da11..ac3f24f5a 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -362,11 +362,39 @@ func (e BucketReplicationDestinationNotFound) Error() string { return "Destination bucket does not exist: " + e.Bucket } -// BucketReplicationTargetNotFound replication target does not exist. -type BucketReplicationTargetNotFound GenericError +// BucketRemoteTargetNotFound remote target does not exist. +type BucketRemoteTargetNotFound GenericError -func (e BucketReplicationTargetNotFound) Error() string { - return "Replication target not found: " + e.Bucket +func (e BucketRemoteTargetNotFound) Error() string { + return "Remote target not found: " + e.Bucket +} + +// BucketRemoteAlreadyExists remote already exists for this target type. +type BucketRemoteAlreadyExists GenericError + +func (e BucketRemoteAlreadyExists) Error() string { + return "Remote already exists for this bucket: " + e.Bucket +} + +// BucketRemoteArnTypeInvalid arn type for remote is not valid. +type BucketRemoteArnTypeInvalid GenericError + +func (e BucketRemoteArnTypeInvalid) Error() string { + return "Remote ARN type not valid: " + e.Bucket +} + +// BucketRemoteArnInvalid arn needs to be specified. +type BucketRemoteArnInvalid GenericError + +func (e BucketRemoteArnInvalid) Error() string { + return "Remote ARN has invalid format: " + e.Bucket +} + +// BucketRemoteRemoveDisallowed when replication configuration exists +type BucketRemoteRemoveDisallowed GenericError + +func (e BucketRemoteRemoveDisallowed) Error() string { + return "Replication configuration exists with this ARN:" + e.Bucket } // BucketReplicationTargetNotVersioned replication target does not have versioning enabled. diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 61e096aad..2fbf8430d 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1177,7 +1177,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - if globalBucketReplicationSys.mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String()) { + if mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String()) { srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } @@ -1258,7 +1258,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re objInfo.ETag = getDecryptedETag(r.Header, objInfo, false) response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime) encodedSuccessResponse := encodeResponse(response) - if globalBucketReplicationSys.mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()) { + if mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()) { defer replicateObject(ctx, dstBucket, dstObject, objInfo.VersionID, objectAPI, &eventArgs{ EventName: event.ObjectCreatedCopy, BucketName: dstBucket, @@ -1511,7 +1511,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - if globalBucketReplicationSys.mustReplicate(ctx, r, bucket, object, metadata, "") { + if mustReplicate(ctx, r, bucket, object, metadata, "") { metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending) } if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() { @@ -1574,7 +1574,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } } - if globalBucketReplicationSys.mustReplicate(ctx, r, bucket, object, metadata, "") { + if mustReplicate(ctx, r, bucket, object, metadata, "") { defer replicateObject(ctx, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{ EventName: event.ObjectCreatedPut, BucketName: bucket, @@ -1696,7 +1696,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - if globalBucketReplicationSys.mustReplicate(ctx, r, bucket, object, metadata, "") { + if mustReplicate(ctx, r, bucket, object, metadata, "") { metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending) } // We need to preserve the encryption headers set in EncryptRequest, @@ -2649,7 +2649,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } setPutObjHeaders(w, objInfo, false) - if globalBucketReplicationSys.mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()) { + if mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()) { defer replicateObject(ctx, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{ EventName: event.ObjectCreatedCompleteMultipartUpload, BucketName: bucket, diff --git a/cmd/server-main.go b/cmd/server-main.go index 6f70fe03b..4f4630470 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -170,7 +170,7 @@ func newAllSubsystems() { globalBucketVersioningSys = NewBucketVersioningSys() // Create new bucket replication subsytem - globalBucketReplicationSys = NewBucketReplicationSys() + globalBucketTargetSys = NewBucketTargetSys() } func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) { @@ -340,10 +340,11 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { return fmt.Errorf("Unable to initialize notification system: %w", err) } - // Initialize bucket replication sub-system. - if err = globalBucketReplicationSys.Init(GlobalContext, buckets, newObject); err != nil { - return fmt.Errorf("Unable to initialize bucket replication sub-system: %w", err) + // Initialize bucket targets sub-system. + if err = globalBucketTargetSys.Init(GlobalContext, buckets, newObject); err != nil { + return fmt.Errorf("Unable to initialize bucket target sub-system: %w", err) } + return nil } diff --git a/docs/bucket/replication/README.md b/docs/bucket/replication/README.md index 015f4ff83..113477f09 100644 --- a/docs/bucket/replication/README.md +++ b/docs/bucket/replication/README.md @@ -13,28 +13,42 @@ To replicate objects in a bucket to a destination bucket on a target site either Create a replication target on the source cluster as shown below: ``` -mc admin bucket replication set myminio/srcbucket https://accessKey:secretKey@replica-endpoint:9000/destbucket --path ON --api s3v4 -Replication ARN = 'arn:minio:s3::dadddae7-f1d7-440f-b5d6-651aa9a8c8a7:*' +mc admin bucket remote set myminio/srcbucket https://accessKey:secretKey@replica-endpoint:9000/destbucket --path ON --type "replica" +Replication ARN = 'arn:minio:replica::c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' ``` Note that the admin needs *s3:GetReplicationConfigurationAction* permission on source cluster. The credential used at the destination requires *s3:ReplicateObject* permission. Once successfully created and authorized this generates a replication target ARN. The command below lists all the currently authorized replication targets: ``` -mc admin bucket remote myminio/srcbucket https://replica-endpoint:9000 -Replication ARN = 'arn:minio:s3::dadddae7-f1d7-440f-b5d6-651aa9a8c8a7:*' +mc admin bucket remote list myminio/srcbucket --type "replica" +Replication ARN = 'arn:minio:replica::c5be6b16-769d-432a-9ef1-4567081f3566:destbucket' ``` The replication configuration can now be added to the source bucket by applying the json file with replication configuration. The ReplicationArn is passed in as a json element in the configuration. ```json { - "ReplicationArn" : "arn:minio:s3::dadddae7-f1d7-440f-b5d6-651aa9a8c8a7:*", + "ReplicationArn" :"arn:minio:replica::c5be6b16-769d-432a-9ef1-4567081f3566:destbucket", "Rules": [ { "Status": "Enabled", "Priority": 1, "DeleteMarkerReplication": { "Status": "Disabled" }, - "Filter" : { "Prefix": "Tax"}, + "Filter" : { + "And": { + "Prefix": "Tax", + "Tags": [ + { + "Key": "Year", + "Value": "2019" + }, + { + "Key": "Company", + "Value": "AcmeCorp" + } + ] + } + }, "Destination": { "Bucket": "arn:aws:s3:::destbucket", "StorageClass": "STANDARD" @@ -45,7 +59,7 @@ The replication configuration can now be added to the source bucket by applying ``` ``` -mc bucket replicate myminio/srcbucket --config replicate-config.json +mc replicate add myminio/srcbucket --priority 1 --prefix "Tax" --arn "arn:minio:replica::c5be6b16-769d-432a-9ef1-4567081f3566:destbucket" --tags "Year=2019&Company=AcmeCorp" --storage-class "STANDARD" Replication configuration applied successfully to myminio/srcbucket. ``` diff --git a/pkg/madmin/examples/bucket-target.go b/pkg/madmin/examples/bucket-target.go index 29680646f..234706b91 100644 --- a/pkg/madmin/examples/bucket-target.go +++ b/pkg/madmin/examples/bucket-target.go @@ -42,19 +42,24 @@ func main() { if err != nil { log.Fatalln(err) } - target := madmin.BucketTarget{Endpoint: "site2:9000", Credentials: creds, TargetBucket: "destbucket", IsSSL: false} + target := madmin.BucketTarget{Endpoint: "site2:9000", Credentials: creds, TargetBucket: "destbucket", IsSSL: false, Type: madmin.ReplicationArn} // Set bucket target if err := madmClnt.SetBucketTarget(ctx, "srcbucket", &target); err != nil { log.Fatalln(err) } - // Get bucket target - target, err = madmClnt.GetBucketTarget(ctx, "srcbucket") + // List all bucket target(s) + target, err = madmClnt.ListBucketTargets(ctx, "srcbucket", "") + if err != nil { + log.Fatalln(err) + } + // Get bucket target for arn type "replica" + target, err = madmClnt.ListBucketTargets(ctx, "srcbucket", "replica") if err != nil { log.Fatalln(err) } - // Remove bucket target - if err := madmClnt.SetBucketTarget(ctx, "srcbucket", nil); err != nil { + arn := "arn:minio:replica::ac66b2cf-dd8f-4e7e-a882-9a64132f0d59:dest" + if err := madmClnt.RemoveBucketTarget(ctx, "srcbucket", arn); err != nil { log.Fatalln(err) } diff --git a/pkg/madmin/remote-target-commands.go b/pkg/madmin/remote-target-commands.go index a26fe0cdd..0fcc6b90a 100644 --- a/pkg/madmin/remote-target-commands.go +++ b/pkg/madmin/remote-target-commands.go @@ -20,11 +20,11 @@ package madmin import ( "context" "encoding/json" - "errors" "fmt" "io/ioutil" "net/http" "net/url" + "strings" "github.com/minio/minio/pkg/auth" ) @@ -33,13 +33,53 @@ import ( type ArnType string const ( - // Replication specifies a ARN type of replication - Replication ArnType = "replication" + // ReplicationArn specifies a ARN type of replication + ReplicationArn ArnType = "replica" ) // IsValid returns true if ARN type is replication func (t ArnType) IsValid() bool { - return t == Replication + return t == ReplicationArn +} + +// ARN is a struct to define arn. +type ARN struct { + Type ArnType + ID string + Region string + Bucket string +} + +// Empty returns true if arn struct is empty +func (a ARN) Empty() bool { + return !a.Type.IsValid() +} +func (a ARN) String() string { + return fmt.Sprintf("arn:minio:%s:%s:%s:%s", a.Type, a.Region, a.ID, a.Bucket) +} + +// ParseARN return ARN struct from string in arn format. +func ParseARN(s string) (*ARN, error) { + // ARN must be in the format of arn:minio:::: + if !strings.HasPrefix(s, "arn:minio:") { + return nil, fmt.Errorf("Invalid ARN %s", s) + } + + tokens := strings.Split(s, ":") + if len(tokens) != 6 { + return nil, fmt.Errorf("Invalid ARN %s", s) + } + + if tokens[4] == "" || tokens[5] == "" { + return nil, fmt.Errorf("Invalid ARN %s", s) + } + + return &ARN{ + Type: ArnType(tokens[2]), + Region: tokens[3], + ID: tokens[4], + Bucket: tokens[5], + }, nil } // BucketTarget represents the target bucket and site association. @@ -52,9 +92,10 @@ type BucketTarget struct { API string `json:"api,omitempty"` Arn string `json:"arn,omitempty"` Type ArnType `json:"type"` + Region string `json:"omitempty"` } -// URL returns replication target url +// URL returns target url func (t BucketTarget) URL() string { scheme := "http" if t.Secure { @@ -72,50 +113,67 @@ func (t *BucketTarget) String() string { return fmt.Sprintf("%s %s", t.Endpoint, t.TargetBucket) } -// GetBucketTarget - gets target for this bucket -func (adm *AdminClient) GetBucketTarget(ctx context.Context, bucket string) (target BucketTarget, err error) { +// BucketTargets represents a slice of bucket targets by type and endpoint +type BucketTargets struct { + Targets []BucketTarget +} + +// Empty returns true if struct is empty. +func (t BucketTargets) Empty() bool { + if len(t.Targets) == 0 { + return true + } + empty := true + for _, t := range t.Targets { + if !t.Empty() { + return false + } + } + return empty +} + +// ListBucketTargets - gets target(s) for this bucket +func (adm *AdminClient) ListBucketTargets(ctx context.Context, bucket, arnType string) (targets []BucketTarget, err error) { queryValues := url.Values{} queryValues.Set("bucket", bucket) + queryValues.Set("type", arnType) reqData := requestData{ - relPath: adminAPIPrefix + "/get-bucket-target", + relPath: adminAPIPrefix + "/list-bucket-targets", queryValues: queryValues, } - // Execute GET on /minio/admin/v3/get-bucket-target + // Execute GET on /minio/admin/v3/list-bucket-targets resp, err := adm.executeMethod(ctx, http.MethodGet, reqData) defer closeResponse(resp) if err != nil { - return target, err + return targets, err } if resp.StatusCode != http.StatusOK { - return target, httpRespToErrorResponse(resp) + return targets, httpRespToErrorResponse(resp) } b, err := ioutil.ReadAll(resp.Body) if err != nil { - return target, err - } - if err = json.Unmarshal(b, &target); err != nil { - return target, err + return targets, err } - if target.Empty() { - return target, errors.New("No bucket target configured") + if err = json.Unmarshal(b, &targets); err != nil { + return targets, err } - return target, nil + return targets, nil } // SetBucketTarget sets up a remote target for this bucket -func (adm *AdminClient) SetBucketTarget(ctx context.Context, bucket string, target *BucketTarget) error { +func (adm *AdminClient) SetBucketTarget(ctx context.Context, bucket string, target *BucketTarget) (string, error) { data, err := json.Marshal(target) if err != nil { - return err + return "", err } encData, err := EncryptData(adm.getSecretKey(), data) if err != nil { - return err + return "", err } queryValues := url.Values{} queryValues.Set("bucket", bucket) @@ -126,52 +184,49 @@ func (adm *AdminClient) SetBucketTarget(ctx context.Context, bucket string, targ content: encData, } - // Execute PUT on /minio/admin/v3/set-bucket-replication-target to set a replication target for this bucket. + // Execute PUT on /minio/admin/v3/set-bucket-target to set a target for this bucket of specific arn type. resp, err := adm.executeMethod(ctx, http.MethodPut, reqData) defer closeResponse(resp) if err != nil { - return err + return "", err } if resp.StatusCode != http.StatusOK { - return httpRespToErrorResponse(resp) + return "", httpRespToErrorResponse(resp) } - - return nil + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + var arn string + if err = json.Unmarshal(b, &arn); err != nil { + return "", err + } + return arn, nil } -// GetBucketTargetARN - gets Arn for this remote target -func (adm *AdminClient) GetBucketTargetARN(ctx context.Context, rURL string) (arn string, err error) { +// RemoveBucketTarget removes a remote target associated with particular ARN for this bucket +func (adm *AdminClient) RemoveBucketTarget(ctx context.Context, bucket, arn string) error { queryValues := url.Values{} - queryValues.Set("url", rURL) + queryValues.Set("bucket", bucket) + queryValues.Set("arn", arn) reqData := requestData{ - relPath: adminAPIPrefix + "/get-bucket-target-arn", + relPath: adminAPIPrefix + "/remove-bucket-target", queryValues: queryValues, } - // Execute GET on /minio/admin/v3/list-bucket-target-arn - resp, err := adm.executeMethod(ctx, http.MethodGet, reqData) - + // Execute PUT on /minio/admin/v3/remove-bucket-target to remove a target for this bucket + // with specific ARN + resp, err := adm.executeMethod(ctx, http.MethodDelete, reqData) defer closeResponse(resp) if err != nil { - return arn, err - } - - if resp.StatusCode != http.StatusOK { - return arn, httpRespToErrorResponse(resp) + return err } - b, err := ioutil.ReadAll(resp.Body) - if err != nil { - return arn, err - } - if err = json.Unmarshal(b, &arn); err != nil { - return arn, err - } - if arn == "" { - return arn, fmt.Errorf("Missing target ARN") + if resp.StatusCode != http.StatusNoContent { + return httpRespToErrorResponse(resp) } - return arn, nil + return nil }