fix: getObject fd leaks in transition and replication code (#11237)

master
Harshavardhana 4 years ago committed by GitHub
parent a6dee21092
commit f0808bb2e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      cmd/bucket-lifecycle.go
  2. 55
      cmd/bucket-replication.go
  3. 16
      cmd/bucket-targets.go

@ -340,13 +340,14 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, objInfo Object
return err return err
} }
oi := gr.ObjInfo oi := gr.ObjInfo
if oi.TransitionStatus == lifecycle.TransitionComplete { if oi.TransitionStatus == lifecycle.TransitionComplete {
gr.Close() // make sure to avoid leaks.
return nil return nil
} }
putOpts := putTransitionOpts(oi) putOpts := putTransitionOpts(oi)
if _, err = tgt.PutObject(ctx, arn.Bucket, oi.Name, gr, oi.Size, "", "", putOpts); err != nil { if _, err = tgt.PutObject(ctx, arn.Bucket, oi.Name, gr, oi.Size, "", "", putOpts); err != nil {
gr.Close() // make sure to avoid leaks.
return err return err
} }
gr.Close() gr.Close()

@ -441,37 +441,42 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
} }
} }
target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err))
return
}
putOpts := putReplicationOpts(ctx, dest, objInfo)
replicationStatus := replication.Complete replicationStatus := replication.Complete
if rtype != replicateAll {
gr.Close()
// Setup bandwidth throttling
peers, _ := globalEndpoints.peers()
totalNodesCount := len(peers)
if totalNodesCount == 0 {
totalNodesCount = 1 // For standalone erasure coding
}
b := target.BandwidthLimit / int64(totalNodesCount)
var headerSize int
for k, v := range putOpts.Header() {
headerSize += len(k) + len(v)
}
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit)
if rtype == replicateAll {
_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts)
} else {
// replicate metadata for object tagging/copy with metadata replacement // replicate metadata for object tagging/copy with metadata replacement
dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}} dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}}
_, err = tgt.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts) _, err = tgt.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts)
} if err != nil {
replicationStatus = replication.Failed
}
} else {
target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err))
gr.Close()
return
}
r.Close() putOpts := putReplicationOpts(ctx, dest, objInfo)
if err != nil { // Setup bandwidth throttling
replicationStatus = replication.Failed peers, _ := globalEndpoints.peers()
totalNodesCount := len(peers)
if totalNodesCount == 0 {
totalNodesCount = 1 // For standalone erasure coding
}
b := target.BandwidthLimit / int64(totalNodesCount)
var headerSize int
for k, v := range putOpts.Header() {
headerSize += len(k) + len(v)
}
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit)
_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts)
if err != nil {
replicationStatus = replication.Failed
}
r.Close()
} }
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String() objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
if objInfo.UserTags != "" { if objInfo.UserTags != "" {

@ -68,7 +68,6 @@ func (sys *BucketTargetSys) ListTargets(ctx context.Context, bucket, arnType str
// ListBucketTargets - gets list of bucket targets for this bucket. // ListBucketTargets - gets list of bucket targets for this bucket.
func (sys *BucketTargetSys) ListBucketTargets(ctx context.Context, bucket string) (*madmin.BucketTargets, error) { func (sys *BucketTargetSys) ListBucketTargets(ctx context.Context, bucket string) (*madmin.BucketTargets, error) {
sys.RLock() sys.RLock()
defer sys.RUnlock() defer sys.RUnlock()
@ -130,9 +129,13 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
sys.Lock() sys.Lock()
defer sys.Unlock() defer sys.Unlock()
tgts := sys.targetsMap[bucket] tgts, ok := sys.targetsMap[bucket]
if !ok {
return BucketRemoteTargetNotFound{Bucket: bucket}
}
newtgts := make([]madmin.BucketTarget, len(tgts)) newtgts := make([]madmin.BucketTarget, len(tgts))
labels := make(map[string]struct{}) labels := make(map[string]struct{}, len(tgts))
found := false found := false
for idx, t := range tgts { for idx, t := range tgts {
labels[t.Label] = struct{}{} labels[t.Label] = struct{}{}
@ -198,9 +201,12 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str
// delete ARN type from list of matching targets // delete ARN type from list of matching targets
sys.Lock() sys.Lock()
defer sys.Unlock() defer sys.Unlock()
targets := make([]madmin.BucketTarget, 0)
found := false found := false
tgts := sys.targetsMap[bucket] tgts, ok := sys.targetsMap[bucket]
if !ok {
return BucketRemoteTargetNotFound{Bucket: bucket}
}
targets := make([]madmin.BucketTarget, 0, len(tgts))
for _, tgt := range tgts { for _, tgt := range tgts {
if tgt.Arn != arnStr { if tgt.Arn != arnStr {
targets = append(targets, tgt) targets = append(targets, tgt)

Loading…
Cancel
Save