From 17747db93f9e2ef2450945feb082d0988dbd0a2f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 17 Jul 2020 17:41:29 -0700 Subject: [PATCH] fix: support healing older content (#10076) This PR adds support for healing older content i.e from 2yrs, 1yr. Also handles other situations where our config was not encrypted yet. This PR also ensures that our Listing is consistent and quorum friendly, such that we don't list partial objects --- cmd/admin-heal-ops.go | 40 +++++++++++++++------------------- cmd/config-migrate.go | 3 ++- cmd/config.go | 7 +++--- cmd/erasure-healing.go | 17 +++++++++++++-- cmd/erasure-zones.go | 30 +++++++++++-------------- cmd/iam-etcd-store.go | 3 ++- cmd/iam-object-store.go | 3 ++- cmd/storage-datatypes.go | 3 +++ cmd/xl-storage-format-utils.go | 2 ++ cmd/xl-storage.go | 6 +++++ 10 files changed, 66 insertions(+), 48 deletions(-) diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 5ee76aac6..914686a3e 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -63,7 +63,7 @@ var ( errHealStopSignalled = fmt.Errorf("heal stop signaled") errFnHealFromAPIErr = func(ctx context.Context, err error) error { - apiErr := toAPIError(ctx, err) + apiErr := toAdminAPIErr(ctx, err) return fmt.Errorf("Heal internal error: %s: %s", apiErr.Code, apiErr.Description) } @@ -151,7 +151,7 @@ func (ahs *allHealState) stopHealSequence(path string) ([]byte, APIError) { he, exists := ahs.getHealSequence(path) if !exists { hsp = madmin.HealStopSuccess{ - ClientToken: "invalid", + ClientToken: "unknown", StartTime: UTCNow(), } } else { @@ -193,21 +193,14 @@ func (ahs *allHealState) stopHealSequence(path string) ([]byte, APIError) { func (ahs *allHealState) LaunchNewHealSequence(h *healSequence) ( respBytes []byte, apiErr APIError, errMsg string) { - existsAndLive := false - he, exists := ahs.getHealSequence(pathJoin(h.bucket, h.object)) - if exists { - existsAndLive = !he.hasEnded() - } - - if existsAndLive { - // A heal sequence exists on the given path. - if h.forceStarted { - // stop the running heal sequence - wait for it to finish. - he.stop() - for !he.hasEnded() { - time.Sleep(1 * time.Second) - } - } else { + if h.forceStarted { + _, apiErr = ahs.stopHealSequence(pathJoin(h.bucket, h.object)) + if apiErr.Code != "" { + return respBytes, apiErr, "" + } + } else { + oh, exists := ahs.getHealSequence(pathJoin(h.bucket, h.object)) + if exists && !oh.hasEnded() { errMsg = "Heal is already running on the given path " + "(use force-start option to stop and start afresh). " + fmt.Sprintf("The heal was started by IP %s at %s, token is %s", @@ -224,7 +217,6 @@ func (ahs *allHealState) LaunchNewHealSequence(h *healSequence) ( hpath := pathJoin(h.bucket, h.object) for k, hSeq := range ahs.healSeqMap { if !hSeq.hasEnded() && (HasPrefix(k, hpath) || HasPrefix(hpath, k)) { - errMsg = "The provided heal sequence path overlaps with an existing " + fmt.Sprintf("heal path: %s", k) return nil, errorCodes.ToAPIErr(ErrHealOverlappingPaths), errMsg @@ -249,7 +241,7 @@ func (ahs *allHealState) LaunchNewHealSequence(h *healSequence) ( }) if err != nil { logger.LogIf(h.ctx, err) - return nil, toAPIError(h.ctx, err), "" + return nil, toAdminAPIErr(h.ctx, err), "" } return b, noError, "" } @@ -264,8 +256,11 @@ func (ahs *allHealState) PopHealStatusJSON(hpath string, // fetch heal state for given path h, exists := ahs.getHealSequence(hpath) if !exists { - // If there is no such heal sequence, return error. - return nil, ErrHealNoSuchProcess + // heal sequence doesn't exist, must have finished. + jbytes, err := json.Marshal(healSequenceStatus{ + Summary: healFinishedStatus, + }) + return jbytes, toAdminAPIErrCode(GlobalContext, err) } // Check if client-token is valid @@ -606,8 +601,7 @@ func (h *healSequence) healSequenceStart() { case <-h.ctx.Done(): h.mutex.Lock() h.endTime = UTCNow() - h.currentStatus.Summary = healStoppedStatus - h.currentStatus.FailureDetail = errHealStopSignalled.Error() + h.currentStatus.Summary = healFinishedStatus h.mutex.Unlock() // drain traverse channel so the traversal diff --git a/cmd/config-migrate.go b/cmd/config-migrate.go index 23e56c795..3fe23cfae 100644 --- a/cmd/config-migrate.go +++ b/cmd/config-migrate.go @@ -24,6 +24,7 @@ import ( "path" "path/filepath" "strings" + "unicode/utf8" "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/config/cache" @@ -2506,7 +2507,7 @@ func checkConfigVersion(objAPI ObjectLayer, configFile string, version string) ( return false, nil, err } - if globalConfigEncrypted { + if globalConfigEncrypted && !utf8.Valid(data) { data, err = madmin.DecryptData(globalActiveCred.String(), bytes.NewReader(data)) if err != nil { if err == madmin.ErrMaliciousData { diff --git a/cmd/config.go b/cmd/config.go index 1fd9a800d..abed9b95a 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -23,6 +23,7 @@ import ( "path" "sort" "strings" + "unicode/utf8" jsoniter "github.com/json-iterator/go" "github.com/minio/minio/cmd/config" @@ -63,7 +64,7 @@ func listServerConfigHistory(ctx context.Context, objAPI ObjectLayer, withData b if err != nil { return nil, err } - if globalConfigEncrypted { + if globalConfigEncrypted && !utf8.Valid(data) { data, err = madmin.DecryptData(globalActiveCred.String(), bytes.NewReader(data)) if err != nil { return nil, err @@ -102,7 +103,7 @@ func readServerConfigHistory(ctx context.Context, objAPI ObjectLayer, uuidKV str return nil, err } - if globalConfigEncrypted { + if globalConfigEncrypted && !utf8.Valid(data) { data, err = madmin.DecryptData(globalActiveCred.String(), bytes.NewReader(data)) } @@ -155,7 +156,7 @@ func readServerConfig(ctx context.Context, objAPI ObjectLayer) (config.Config, e return nil, err } - if globalConfigEncrypted { + if globalConfigEncrypted && !utf8.Valid(configData) { configData, err = madmin.DecryptData(globalActiveCred.String(), bytes.NewReader(configData)) if err != nil { if err == madmin.ErrMaliciousData { diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index dc612d693..9431d778f 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -204,7 +204,7 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, quorumModTime t return true } if erErr == nil { - // If er.meta was read fine but there may be problem with the part.N files. + // If xl.meta was read fine but there may be problem with the part.N files. if IsErr(dataErr, []error{ errFileNotFound, errFileVersionNotFound, @@ -212,6 +212,9 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, quorumModTime t }...) { return true } + if meta.XLV1 { + return true + } if !quorumModTime.Equal(meta.ModTime) { return true } @@ -356,6 +359,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // We write at temporary location and then rename to final location. tmpID := mustGetUUID() + migrateDataDir := mustGetUUID() for i := range outDatedDisks { if outDatedDisks[i] == nil { @@ -396,6 +400,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s } checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(partNumber) partPath := pathJoin(object, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber)) + if latestMeta.XLV1 { + partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber)) + } readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) } writers := make([]io.Writer, len(outDatedDisks)) @@ -404,6 +411,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s continue } partPath := pathJoin(tmpID, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber)) + if latestMeta.XLV1 { + partPath = pathJoin(tmpID, migrateDataDir, fmt.Sprintf("part.%d", partNumber)) + } writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } err = erasure.Heal(ctx, readers, writers, partSize) @@ -427,6 +437,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s continue } + if partsMetadata[i].XLV1 { + partsMetadata[i].DataDir = migrateDataDir + } partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize) partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ PartNumber: partNumber, @@ -458,7 +471,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s } // Attempt a rename now from healed data to final location. - if err = disk.RenameData(minioMetaTmpBucket, tmpID, latestMeta.DataDir, bucket, object); err != nil { + if err = disk.RenameData(minioMetaTmpBucket, tmpID, partsMetadata[0].DataDir, bucket, object); err != nil { if err != errIsNotRegular && err != errFileNotFound { logger.LogIf(ctx, err) } diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 9b0b17fda..1fbf31dd4 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -681,10 +681,9 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, endWalkCh := make(chan struct{}) defer close(endWalkCh) - const ndisks = 3 for _, zone := range z.zones { zonesEntryChs = append(zonesEntryChs, - zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, ndisks)) + zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.drivesPerSet)) } var objInfos []ObjectInfo @@ -709,8 +708,8 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, break } - if quorumCount < ndisks-1 { - // Skip entries which are not found on upto ndisks. + if quorumCount < z.zones[0].drivesPerSet/2 { + // Skip entries which are not found on upto ndisks/2. continue } @@ -797,18 +796,17 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma var zonesEntryChs [][]FileInfoCh var zonesEndWalkCh []chan struct{} - const ndisks = 3 for _, zone := range z.zones { entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, ndisks) + entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, zone.drivesPerSet) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) } - entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, ndisks) + entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, z.zones[0].drivesPerSet) if len(entries.Files) == 0 { return loi, nil } @@ -888,18 +886,17 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker, var zonesEntryChs [][]FileInfoCh var zonesEndWalkCh []chan struct{} - const ndisks = 3 for _, zone := range z.zones { entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, ndisks) + entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.drivesPerSet) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) } - entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, ndisks) + entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, z.zones[0].drivesPerSet) if len(entries.Files) == 0 { return loi, nil } @@ -1118,8 +1115,8 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i break } - if quorumCount < ndisks-1 { - // Skip entries which are not found on upto ndisks. + if quorumCount < ndisks/2 { + // Skip entries which are not found on upto ndisks/2. continue } @@ -1149,8 +1146,8 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int) break } - if quorumCount < ndisks-1 { - // Skip entries which are not found on upto ndisks. + if quorumCount < ndisks/2 { + // Skip entries which are not found on upto ndisks/2. continue } @@ -1292,18 +1289,17 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m var zonesEntryChs [][]FileInfoVersionsCh var zonesEndWalkCh []chan struct{} - const ndisks = 3 for _, zone := range z.zones { entryChs, endWalkCh := zone.poolVersions.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, ndisks) + entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.drivesPerSet) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) } - entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, ndisks) + entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, z.zones[0].drivesPerSet) if len(entries.FilesVersions) == 0 { return loi, nil } diff --git a/cmd/iam-etcd-store.go b/cmd/iam-etcd-store.go index 54f332bff..3a22a3e1d 100644 --- a/cmd/iam-etcd-store.go +++ b/cmd/iam-etcd-store.go @@ -25,6 +25,7 @@ import ( "strings" "sync" "time" + "unicode/utf8" jwtgo "github.com/dgrijalva/jwt-go" "github.com/minio/minio-go/v7/pkg/set" @@ -120,7 +121,7 @@ func (ies *IAMEtcdStore) loadIAMConfig(item interface{}, path string) error { return err } - if globalConfigEncrypted { + if globalConfigEncrypted && !utf8.Valid(pdata) { pdata, err = madmin.DecryptData(globalActiveCred.String(), bytes.NewReader(pdata)) if err != nil { return err diff --git a/cmd/iam-object-store.go b/cmd/iam-object-store.go index a392c5f61..fe0dc7c10 100644 --- a/cmd/iam-object-store.go +++ b/cmd/iam-object-store.go @@ -24,6 +24,7 @@ import ( "strings" "sync" "time" + "unicode/utf8" jwtgo "github.com/dgrijalva/jwt-go" @@ -223,7 +224,7 @@ func (iamOS *IAMObjectStore) loadIAMConfig(item interface{}, path string) error if err != nil { return err } - if globalConfigEncrypted { + if globalConfigEncrypted && !utf8.Valid(data) { data, err = madmin.DecryptData(globalActiveCred.String(), bytes.NewReader(data)) if err != nil { return err diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 1490ff98c..7b316aa82 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -80,6 +80,9 @@ type FileInfo struct { // DataDir of the file DataDir string + // Indicates if this object is still in V1 format. + XLV1 bool + // Date and time when the file was last modified, if Deleted // is 'true' this value represents when while was deleted. ModTime time.Time diff --git a/cmd/xl-storage-format-utils.go b/cmd/xl-storage-format-utils.go index 04eef7973..8ed7854c3 100644 --- a/cmd/xl-storage-format-utils.go +++ b/cmd/xl-storage-format-utils.go @@ -50,6 +50,7 @@ func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersion } fi.IsLatest = true // No versions so current version is latest. + fi.XLV1 = true // indicates older version return FileInfoVersions{ Volume: volume, Name: path, @@ -76,5 +77,6 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string) (FileInfo, er if err == errFileNotFound && versionID != "" { return fi, errFileVersionNotFound } + fi.XLV1 = true // indicates older version return fi, err } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 0ddb560ff..6af0f61a7 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1775,6 +1775,9 @@ func (s *xlStorage) CheckParts(volume, path string, fi FileInfo) error { for _, part := range fi.Parts { partPath := pathJoin(path, fi.DataDir, fmt.Sprintf("part.%d", part.Number)) + if fi.XLV1 { + partPath = pathJoin(path, fmt.Sprintf("part.%d", part.Number)) + } filePath := pathJoin(volumeDir, partPath) if err = checkPathLength(filePath); err != nil { return err @@ -2380,6 +2383,9 @@ func (s *xlStorage) VerifyFile(volume, path string, fi FileInfo) (err error) { for _, part := range fi.Parts { checksumInfo := erasure.GetChecksumInfo(part.Number) partPath := pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", part.Number)) + if fi.XLV1 { + partPath = pathJoin(volumeDir, path, fmt.Sprintf("part.%d", part.Number)) + } if err := s.bitrotVerify(partPath, erasure.ShardFileSize(part.Size), checksumInfo.Algorithm,