|
|
|
@ -87,7 +87,7 @@ func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object strin |
|
|
|
|
|
|
|
|
|
// mustReplicate returns true if object meets replication criteria.
|
|
|
|
|
func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool { |
|
|
|
|
if s3Err := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone { |
|
|
|
|
if s3Err := isPutActionAllowed(getRequestAuthType(r), bucket, "", r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone { |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
return mustReplicater(ctx, r, bucket, object, meta, replStatus) |
|
|
|
@ -304,9 +304,14 @@ var ( |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func newReplicationState() *replicationState { |
|
|
|
|
return &replicationState{ |
|
|
|
|
rs := &replicationState{ |
|
|
|
|
replicaCh: make(chan ObjectInfo, globalReplicationConcurrent*2), |
|
|
|
|
} |
|
|
|
|
go func() { |
|
|
|
|
<-GlobalContext.Done() |
|
|
|
|
close(rs.replicaCh) |
|
|
|
|
}() |
|
|
|
|
return rs |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// addWorker creates a new worker to process tasks
|
|
|
|
@ -316,7 +321,6 @@ func (r *replicationState) addWorker(ctx context.Context, objectAPI ObjectLayer) |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
close(r.replicaCh) |
|
|
|
|
return |
|
|
|
|
case oi, ok := <-r.replicaCh: |
|
|
|
|
if !ok { |
|
|
|
|