fix: do not crash PutObjectTags when node is down (#10940)

fixes #10939
master
Harshavardhana 4 years ago committed by GitHub
parent 251c1ef6da
commit 95675b0c9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      cmd/erasure-multipart.go
  2. 47
      cmd/erasure-object.go
  3. 8
      pkg/bucket/lifecycle/action_string.go

@ -819,10 +819,12 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
// Update all erasure metadata, make sure to not modify fields like // Update all erasure metadata, make sure to not modify fields like
// checksum which are different on each disks. // checksum which are different on each disks.
for index := range partsMetadata { for index := range partsMetadata {
partsMetadata[index].Size = fi.Size if partsMetadata[index].IsValid() {
partsMetadata[index].ModTime = fi.ModTime partsMetadata[index].Size = fi.Size
partsMetadata[index].Metadata = fi.Metadata partsMetadata[index].ModTime = fi.ModTime
partsMetadata[index].Parts = fi.Parts partsMetadata[index].Metadata = fi.Metadata
partsMetadata[index].Parts = fi.Parts
}
} }
// Write final `xl.meta` at uploadID location // Write final `xl.meta` at uploadID location

@ -75,9 +75,6 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
if err != nil { if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject) return oi, toObjectErr(err, srcBucket, srcObject)
} }
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi.Erasure.Distribution)
if fi.Deleted { if fi.Deleted {
if srcOpts.VersionID == "" { if srcOpts.VersionID == "" {
return oi, toObjectErr(errFileNotFound, srcBucket, srcObject) return oi, toObjectErr(errFileNotFound, srcBucket, srcObject)
@ -85,6 +82,8 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
return fi.ToObjectInfo(srcBucket, srcObject), toObjectErr(errMethodNotAllowed, srcBucket, srcObject) return fi.ToObjectInfo(srcBucket, srcObject), toObjectErr(errMethodNotAllowed, srcBucket, srcObject)
} }
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi.Erasure.Distribution)
versionID := srcInfo.VersionID versionID := srcInfo.VersionID
if srcInfo.versionOnly { if srcInfo.versionOnly {
versionID = dstOpts.VersionID versionID = dstOpts.VersionID
@ -105,9 +104,11 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
// Update `xl.meta` content on each disks. // Update `xl.meta` content on each disks.
for index := range metaArr { for index := range metaArr {
metaArr[index].ModTime = modTime if metaArr[index].IsValid() {
metaArr[index].VersionID = versionID metaArr[index].ModTime = modTime
metaArr[index].Metadata = srcInfo.UserDefined metaArr[index].VersionID = versionID
metaArr[index].Metadata = srcInfo.UserDefined
}
} }
tempObj := mustGetUUID() tempObj := mustGetUUID()
@ -293,6 +294,9 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
if disk == OfflineDisk { if disk == OfflineDisk {
continue continue
} }
if !metaArr[index].IsValid() {
continue
}
checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber) checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber)) partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber))
readers[index] = newBitrotReader(disk, bucket, partPath, tillOffset, readers[index] = newBitrotReader(disk, bucket, partPath, tillOffset,
@ -1071,9 +1075,6 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
if err != nil { if err != nil {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi.Erasure.Distribution)
if fi.Deleted { if fi.Deleted {
if opts.VersionID == "" { if opts.VersionID == "" {
return toObjectErr(errFileNotFound, bucket, object) return toObjectErr(errFileNotFound, bucket, object)
@ -1081,22 +1082,20 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
return toObjectErr(errMethodNotAllowed, bucket, object) return toObjectErr(errMethodNotAllowed, bucket, object)
} }
for i, fi := range metaArr { onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi.Erasure.Distribution)
if errs[i] != nil { for i, metaFi := range metaArr {
// Avoid disks where loading metadata fail if metaFi.IsValid() {
continue // clean fi.Meta of tag key, before updating the new tags
} delete(metaFi.Metadata, xhttp.AmzObjectTagging)
// Don't update for empty tags
// clean fi.Meta of tag key, before updating the new tags if tags != "" {
delete(fi.Metadata, xhttp.AmzObjectTagging) metaFi.Metadata[xhttp.AmzObjectTagging] = tags
// Don't update for empty tags }
if tags != "" { for k, v := range opts.UserDefined {
fi.Metadata[xhttp.AmzObjectTagging] = tags metaFi.Metadata[k] = v
} }
for k, v := range opts.UserDefined { metaArr[i].Metadata = metaFi.Metadata
fi.Metadata[k] = v
} }
metaArr[i].Metadata = fi.Metadata
} }
tempObj := mustGetUUID() tempObj := mustGetUUID()

@ -11,11 +11,15 @@ func _() {
_ = x[NoneAction-0] _ = x[NoneAction-0]
_ = x[DeleteAction-1] _ = x[DeleteAction-1]
_ = x[DeleteVersionAction-2] _ = x[DeleteVersionAction-2]
_ = x[TransitionAction-3]
_ = x[TransitionVersionAction-4]
_ = x[DeleteRestoredAction-5]
_ = x[DeleteRestoredVersionAction-6]
} }
const _Action_name = "NoneActionDeleteActionDeleteVersionAction" const _Action_name = "NoneActionDeleteActionDeleteVersionActionTransitionActionTransitionVersionActionDeleteRestoredActionDeleteRestoredVersionAction"
var _Action_index = [...]uint8{0, 10, 22, 41} var _Action_index = [...]uint8{0, 10, 22, 41, 57, 80, 100, 127}
func (i Action) String() string { func (i Action) String() string {
if i < 0 || i >= Action(len(_Action_index)-1) { if i < 0 || i >= Action(len(_Action_index)-1) {

Loading…
Cancel
Save