fix possible crashes on deleteMarker replication (#11308)

Delete marker can have `metaSys` set to nil, that
can lead to crashes after the delete marker has
been healed.

Additionally also fix isObjectDangling check
for transitioned objects, that do not have parts
should be treated similar to Delete marker.
master
Harshavardhana 4 years ago committed by GitHub
parent dac19d7272
commit d1a8f0b786
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      cmd/erasure-healing.go
  2. 8
      cmd/xl-storage-format-v1.go
  3. 31
      cmd/xl-storage-format-v2.go
  4. 3
      cmd/xl-storage.go

@ -25,6 +25,7 @@ import (
"time" "time"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/lifecycle"
"github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/sync/errgroup" "github.com/minio/minio/pkg/sync/errgroup"
) )
@ -226,8 +227,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
dryRun := opts.DryRun dryRun := opts.DryRun
scanMode := opts.ScanMode scanMode := opts.ScanMode
dataBlocks := lfi.Erasure.DataBlocks
storageDisks := er.getDisks() storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints() storageEndpoints := er.getEndpoints()
@ -244,8 +243,8 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
Bucket: bucket, Bucket: bucket,
Object: object, Object: object,
DiskCount: len(storageDisks), DiskCount: len(storageDisks),
ParityBlocks: len(storageDisks) / 2, ParityBlocks: er.defaultParityCount,
DataBlocks: len(storageDisks) / 2, DataBlocks: len(storageDisks) - er.defaultParityCount,
} }
// Loop to find number of disks with valid data, per-drive // Loop to find number of disks with valid data, per-drive
@ -316,7 +315,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// If less than read quorum number of disks have all the parts // If less than read quorum number of disks have all the parts
// of the data, we can't reconstruct the erasure-coded data. // of the data, we can't reconstruct the erasure-coded data.
if numAvailableDisks < dataBlocks { if numAvailableDisks < result.DataBlocks {
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, dataErrs, opts) return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, dataErrs, opts)
} }
@ -333,7 +332,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// Latest FileInfo for reference. If a valid metadata is not // Latest FileInfo for reference. If a valid metadata is not
// present, it is as good as object not found. // present, it is as good as object not found.
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataBlocks) latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks)
if err != nil { if err != nil {
return result, toObjectErr(err, bucket, object, versionID) return result, toObjectErr(err, bucket, object, versionID)
} }
@ -363,7 +362,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
dataDir = migrateDataDir dataDir = migrateDataDir
} }
if !latestMeta.Deleted { if !latestMeta.Deleted || latestMeta.TransitionStatus != lifecycle.TransitionComplete {
result.DataBlocks = latestMeta.Erasure.DataBlocks result.DataBlocks = latestMeta.Erasure.DataBlocks
result.ParityBlocks = latestMeta.Erasure.ParityBlocks result.ParityBlocks = latestMeta.Erasure.ParityBlocks
@ -775,8 +774,10 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
break break
} }
if validMeta.Deleted { if validMeta.Deleted || validMeta.TransitionStatus == lifecycle.TransitionComplete {
// notFoundParts is ignored since a delete marker does not have any parts // notFoundParts is ignored since a
// - delete marker does not have any parts
// - transition status of complete has no parts
return validMeta, corruptedErasureMeta+notFoundErasureMeta > len(errs)/2 return validMeta, corruptedErasureMeta+notFoundErasureMeta > len(errs)/2
} }

@ -182,10 +182,6 @@ func (m *xlMetaV1Object) ToFileInfo(volume, path string) (FileInfo, error) {
if !m.valid() { if !m.valid() {
return FileInfo{}, errFileCorrupt return FileInfo{}, errFileCorrupt
} }
var transitionStatus string
if st, ok := m.Meta[ReservedMetadataPrefixLower+"transition-status"]; ok {
transitionStatus = st
}
fi := FileInfo{ fi := FileInfo{
Volume: volume, Volume: volume,
Name: path, Name: path,
@ -197,8 +193,8 @@ func (m *xlMetaV1Object) ToFileInfo(volume, path string) (FileInfo, error) {
VersionID: m.VersionID, VersionID: m.VersionID,
DataDir: m.DataDir, DataDir: m.DataDir,
} }
if transitionStatus != "" { if st, ok := m.Meta[ReservedMetadataPrefixLower+"transition-status"]; ok {
fi.TransitionStatus = transitionStatus fi.TransitionStatus = st
} }
return fi, nil return fi, nil
} }

@ -260,6 +260,7 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
ventry.DeleteMarker = &xlMetaV2DeleteMarker{ ventry.DeleteMarker = &xlMetaV2DeleteMarker{
VersionID: uv, VersionID: uv,
ModTime: fi.ModTime.UnixNano(), ModTime: fi.ModTime.UnixNano(),
MetaSys: make(map[string][]byte),
} }
} else { } else {
ventry.Type = ObjectType ventry.Type = ObjectType
@ -355,13 +356,19 @@ func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error)
versionID = uuid.UUID(j.VersionID).String() versionID = uuid.UUID(j.VersionID).String()
} }
fi := FileInfo{ fi := FileInfo{
Volume: volume, Volume: volume,
Name: path, Name: path,
ModTime: time.Unix(0, j.ModTime).UTC(), ModTime: time.Unix(0, j.ModTime).UTC(),
VersionID: versionID, VersionID: versionID,
Deleted: true, Deleted: true,
DeleteMarkerReplicationStatus: string(j.MetaSys[xhttp.AmzBucketReplicationStatus]), }
VersionPurgeStatus: VersionPurgeStatusType(string(j.MetaSys[VersionPurgeStatusKey])), for k, v := range j.MetaSys {
if strings.EqualFold(k, xhttp.AmzBucketReplicationStatus) {
fi.DeleteMarkerReplicationStatus = string(v)
}
if strings.EqualFold(k, VersionPurgeStatusKey) {
fi.VersionPurgeStatus = VersionPurgeStatusType(string(v))
}
} }
return fi, nil return fi, nil
} }
@ -408,12 +415,15 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) {
fi.Metadata[k] = v fi.Metadata[k] = v
} }
for k, v := range j.MetaSys { for k, v := range j.MetaSys {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { if strings.EqualFold(strings.ToLower(k), ReservedMetadataPrefixLower+"transition-status") {
fi.Metadata[k] = string(v) fi.TransitionStatus = string(v)
} }
if strings.EqualFold(k, VersionPurgeStatusKey) { if strings.EqualFold(k, VersionPurgeStatusKey) {
fi.VersionPurgeStatus = VersionPurgeStatusType(string(v)) fi.VersionPurgeStatus = VersionPurgeStatusType(string(v))
} }
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
fi.Metadata[k] = string(v)
}
} }
fi.Erasure.Algorithm = j.ErasureAlgorithm.String() fi.Erasure.Algorithm = j.ErasureAlgorithm.String()
fi.Erasure.Index = j.ErasureIndex fi.Erasure.Index = j.ErasureIndex
@ -426,9 +436,6 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) {
} }
fi.DataDir = uuid.UUID(j.DataDir).String() fi.DataDir = uuid.UUID(j.DataDir).String()
if st, ok := j.MetaSys[ReservedMetadataPrefixLower+"transition-status"]; ok {
fi.TransitionStatus = string(st)
}
return fi, nil return fi, nil
} }

@ -962,7 +962,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
} }
// when data-dir is specified. Transition leverages existing DeleteObject // when data-dir is specified. Transition leverages existing DeleteObject
// api call to mark object as deleted.When object is pending transition, // api call to mark object as deleted. When object is pending transition,
// just update the metadata and avoid deleting data dir. // just update the metadata and avoid deleting data dir.
if dataDir != "" && fi.TransitionStatus != lifecycle.TransitionPending { if dataDir != "" && fi.TransitionStatus != lifecycle.TransitionPending {
filePath := pathJoin(volumeDir, path, dataDir) filePath := pathJoin(volumeDir, path, dataDir)
@ -974,6 +974,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
return err return err
} }
} }
// transitioned objects maintains metadata on the source cluster. When transition // transitioned objects maintains metadata on the source cluster. When transition
// status is set, update the metadata to disk. // status is set, update the metadata to disk.
if !lastVersion || fi.TransitionStatus != "" { if !lastVersion || fi.TransitionStatus != "" {

Loading…
Cancel
Save