Fix format migration regression (#5668)

Migration regression got introduced in 9083bc152e
adding more unit tests to catch this scenario, we need to fix this by
re-writing the formats after the migration to 'V3'.

This bug only happens when a user is migrating directly from V1 to V3,
not from V1 to V2 and V2 to V3.

Added additional unit tests to cover these situations as well.

Fixes #5667
master
Harshavardhana 7 years ago committed by Nitish Tiwari
parent 049090126e
commit 2938e332ba
  1. 50
      cmd/format-xl.go
  2. 91
      cmd/format-xl_test.go
  3. 14
      cmd/prepare-storage.go

@ -232,6 +232,7 @@ func formatXLMigrateV1ToV2(export string) error {
formatV2.Format = formatBackendXL formatV2.Format = formatBackendXL
formatV2.XL.Version = formatXLVersionV2 formatV2.XL.Version = formatXLVersionV2
formatV2.XL.DistributionAlgo = formatXLVersionV2DistributionAlgo formatV2.XL.DistributionAlgo = formatXLVersionV2DistributionAlgo
formatV2.XL.This = formatV1.XL.Disk
formatV2.XL.Sets = make([][]string, 1) formatV2.XL.Sets = make([][]string, 1)
formatV2.XL.Sets[0] = make([]string, len(formatV1.XL.JBOD)) formatV2.XL.Sets[0] = make([]string, len(formatV1.XL.JBOD))
copy(formatV2.XL.Sets[0], formatV1.XL.JBOD) copy(formatV2.XL.Sets[0], formatV1.XL.JBOD)
@ -602,6 +603,55 @@ func initStorageDisks(endpoints EndpointList) ([]StorageAPI, error) {
return storageDisks, nil return storageDisks, nil
} }
// formatXLV3ThisEmpty - find out if '.This' field is empty
// in any of the input `formats`, if yes return true.
func formatXLV3ThisEmpty(formats []*formatXLV3) bool {
for _, format := range formats {
if format == nil {
continue
}
// NOTE: This code is specifically needed when migrating version
// V1 to V2 to V3, in a scenario such as this we only need to handle
// single sets since we never used to support multiple sets in releases
// with V1 format version.
if len(format.XL.Sets) > 1 {
continue
}
if format.XL.This == "" {
return true
}
}
return false
}
// fixFormatXLV3 - fix format XL configuration on all disks.
func fixFormatXLV3(endpoints EndpointList, formats []*formatXLV3) error {
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
return err
}
for i, format := range formats {
if format == nil || !endpoints[i].IsLocal {
continue
}
// NOTE: This code is specifically needed when migrating version
// V1 to V2 to V3, in a scenario such as this we only need to handle
// single sets since we never used to support multiple sets in releases
// with V1 format version.
if len(format.XL.Sets) > 1 {
continue
}
if format.XL.This == "" {
formats[i].XL.This = format.XL.Sets[0][i]
if err = saveFormatXL(storageDisks[i], formats[i]); err != nil {
return err
}
}
}
return nil
}
// initFormatXL - save XL format configuration on all disks. // initFormatXL - save XL format configuration on all disks.
func initFormatXL(endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) { func initFormatXL(endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) {
format = newFormatXLV3(setCount, disksPerSet) format = newFormatXLV3(setCount, disksPerSet)

@ -18,10 +18,12 @@ package cmd
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"reflect"
"testing" "testing"
"github.com/minio/minio/pkg/errors"
) )
// Test get offline/online uuids. // Test get offline/online uuids.
@ -74,6 +76,73 @@ func TestGetUUIDs(t *testing.T) {
} }
} }
// tests fixFormatXLV3 - fix format.json on all disks.
func TestFixFormatV3(t *testing.T) {
xlDirs, err := getRandomDisks(8)
if err != nil {
t.Fatal(err)
}
for _, xlDir := range xlDirs {
defer os.RemoveAll(xlDir)
}
endpoints := mustGetNewEndpointList(xlDirs...)
format := newFormatXLV3(1, 8)
formats := make([]*formatXLV3, 8)
for j := 0; j < 8; j++ {
newFormat := *format
newFormat.XL.This = format.XL.Sets[0][j]
formats[j] = &newFormat
}
if err = initFormatXLMetaVolume(endpoints, formats); err != nil {
t.Fatal(err)
}
formats[1] = nil
expThis := formats[2].XL.This
formats[2].XL.This = ""
if err := fixFormatXLV3(endpoints, formats); err != nil {
t.Fatal(err)
}
newFormats, errs := loadFormatXLAll(endpoints)
for _, err := range errs {
if err != nil && errors.Cause(err) != errUnformattedDisk {
t.Fatal(err)
}
}
gotThis := newFormats[2].XL.This
if expThis != gotThis {
t.Fatalf("expected uuid %s, got %s", expThis, gotThis)
}
}
// tests formatXLV3ThisEmpty conditions.
func TestFormatXLEmpty(t *testing.T) {
format := newFormatXLV3(1, 16)
formats := make([]*formatXLV3, 16)
for j := 0; j < 16; j++ {
newFormat := *format
newFormat.XL.This = format.XL.Sets[0][j]
formats[j] = &newFormat
}
// empty format to indicate disk not found, but this
// empty should return false.
formats[0] = nil
if ok := formatXLV3ThisEmpty(formats); ok {
t.Fatalf("expected value false, got %t", ok)
}
formats[2].XL.This = ""
if ok := formatXLV3ThisEmpty(formats); !ok {
t.Fatalf("expected value true, got %t", ok)
}
}
// Tests format xl get version. // Tests format xl get version.
func TestFormatXLGetVersion(t *testing.T) { func TestFormatXLGetVersion(t *testing.T) {
// Get test root. // Get test root.
@ -201,7 +270,25 @@ func TestFormatXLMigrate(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if migratedVersion != formatXLVersionV3 { if migratedVersion != formatXLVersionV3 {
t.Fatal(fmt.Sprintf("expected version: %s, got: %s", formatXLVersionV3, migratedVersion)) t.Fatalf("expected version: %s, got: %s", formatXLVersionV3, migratedVersion)
}
b, err = ioutil.ReadFile(pathJoin(rootPath, minioMetaBucket, formatConfigFile))
if err != nil {
t.Fatal(err)
}
formatV3 := &formatXLV3{}
if err = json.Unmarshal(b, formatV3); err != nil {
t.Fatal(err)
}
if formatV3.XL.This != m.XL.Disk {
t.Fatalf("expected disk uuid: %s, got: %s", m.XL.Disk, formatV3.XL.This)
}
if len(formatV3.XL.Sets) != 1 {
t.Fatalf("expected single set after migrating from v1 to v3, but found %d", len(formatV3.XL.Sets))
}
if !reflect.DeepEqual(formatV3.XL.Sets[0], m.XL.JBOD) {
t.Fatalf("expected disk uuid: %v, got: %v", m.XL.JBOD, formatV3.XL.Sets[0])
} }
m = &formatXLV1{} m = &formatXLV1{}

@ -147,6 +147,20 @@ func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerS
return initFormatXL(endpoints, setCount, disksPerSet) return initFormatXL(endpoints, setCount, disksPerSet)
} }
// Following function is added to fix a regressions which was introduced
// in release RELEASE.2018-03-16T22-52-12Z after migrating v1 to v2 to v3.
// This migration failed to capture '.This' field properly which indicates
// the disk UUID association. Below function is called to handle and fix
// this regression, for more info refer https://github.com/minio/minio/issues/5667
if err = fixFormatXLV3(endpoints, formatConfigs); err != nil {
return nil, err
}
// If any of the .This field is still empty we wait them to be fixed.
if formatXLV3ThisEmpty(formatConfigs) {
continue
}
format, err = getFormatXLInQuorum(formatConfigs) format, err = getFormatXLInQuorum(formatConfigs)
if err == nil { if err == nil {
for i := range formatConfigs { for i := range formatConfigs {

Loading…
Cancel
Save