Simplify and cleanup metadata r/w functions (#8146)

master
Harshavardhana 5 years ago committed by kannappanr
parent a7be313230
commit 53e4887e02
  1. 2
      cmd/disk-cache.go
  2. 6
      cmd/format-xl.go
  3. 8
      cmd/xl-v1-bucket.go
  4. 2
      cmd/xl-v1-healing.go
  5. 48
      cmd/xl-v1-metadata.go
  6. 30
      cmd/xl-v1-multipart.go
  7. 10
      cmd/xl-v1-object.go
  8. 17
      cmd/xl-v1-utils.go
  9. 2
      pkg/cpu/cpu.go

@ -449,7 +449,7 @@ func checkAtimeSupport(dir string) (err error) {
func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) { func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) {
logger.StartupMessage(colorBlue("Cache migration initiated ....")) logger.StartupMessage(colorBlue("Cache migration initiated ...."))
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
errs := make([]error, len(c.cache)) errs := make([]error, len(c.cache))
for i, dc := range c.cache { for i, dc := range c.cache {
if dc == nil { if dc == nil {

@ -325,7 +325,7 @@ func quorumUnformattedDisks(errs []error) bool {
// loadFormatXLAll - load all format config from all input disks in parallel. // loadFormatXLAll - load all format config from all input disks in parallel.
func loadFormatXLAll(storageDisks []StorageAPI) ([]*formatXLV3, []error) { func loadFormatXLAll(storageDisks []StorageAPI) ([]*formatXLV3, []error) {
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Initialize list of errors. // Initialize list of errors.
var sErrs = make([]error, len(storageDisks)) var sErrs = make([]error, len(storageDisks))
@ -652,7 +652,7 @@ func formatXLV3Check(reference *formatXLV3, format *formatXLV3) error {
func saveFormatXLAll(ctx context.Context, storageDisks []StorageAPI, formats []*formatXLV3) error { func saveFormatXLAll(ctx context.Context, storageDisks []StorageAPI, formats []*formatXLV3) error {
var errs = make([]error, len(storageDisks)) var errs = make([]error, len(storageDisks))
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Write `format.json` to all disks. // Write `format.json` to all disks.
for index, disk := range storageDisks { for index, disk := range storageDisks {
@ -812,7 +812,7 @@ func initFormatXLMetaVolume(storageDisks []StorageAPI, formats []*formatXLV3) er
// This happens for the first time, but keep this here since this // This happens for the first time, but keep this here since this
// is the only place where it can be made expensive optimizing all // is the only place where it can be made expensive optimizing all
// other calls. Create minio meta volume, if it doesn't exist yet. // other calls. Create minio meta volume, if it doesn't exist yet.
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Initialize errs to collect errors inside go-routine. // Initialize errs to collect errors inside go-routine.
var errs = make([]error, len(storageDisks)) var errs = make([]error, len(storageDisks))

@ -43,7 +43,7 @@ func (xl xlObjects) MakeBucketWithLocation(ctx context.Context, bucket, location
} }
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Initialize list of errors. // Initialize list of errors.
var dErrs = make([]error, len(xl.getDisks())) var dErrs = make([]error, len(xl.getDisks()))
@ -82,7 +82,7 @@ func (xl xlObjects) MakeBucketWithLocation(ctx context.Context, bucket, location
func (xl xlObjects) undoDeleteBucket(bucket string) { func (xl xlObjects) undoDeleteBucket(bucket string) {
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Undo previous make bucket entry on all underlying storage disks. // Undo previous make bucket entry on all underlying storage disks.
for index, disk := range xl.getDisks() { for index, disk := range xl.getDisks() {
if disk == nil { if disk == nil {
@ -103,7 +103,7 @@ func (xl xlObjects) undoDeleteBucket(bucket string) {
// undo make bucket operation upon quorum failure. // undo make bucket operation upon quorum failure.
func undoMakeBucket(storageDisks []StorageAPI, bucket string) { func undoMakeBucket(storageDisks []StorageAPI, bucket string) {
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Undo previous make bucket entry on all underlying storage disks. // Undo previous make bucket entry on all underlying storage disks.
for index, disk := range storageDisks { for index, disk := range storageDisks {
if disk == nil { if disk == nil {
@ -245,7 +245,7 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
defer bucketLock.Unlock() defer bucketLock.Unlock()
// Collect if all disks report volume not found. // Collect if all disks report volume not found.
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
var dErrs = make([]error, len(xl.getDisks())) var dErrs = make([]error, len(xl.getDisks()))
// Remove a volume entry on all underlying storage disks. // Remove a volume entry on all underlying storage disks.

@ -57,7 +57,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
dryRun bool) (res madmin.HealResultItem, err error) { dryRun bool) (res madmin.HealResultItem, err error) {
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Initialize list of errors. // Initialize list of errors.
var dErrs = make([]error, len(storageDisks)) var dErrs = make([]error, len(storageDisks))

@ -451,7 +451,7 @@ func renameXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcEnt
// writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order. // writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order.
func writeUniqueXLMetadata(ctx context.Context, disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) ([]StorageAPI, error) { func writeUniqueXLMetadata(ctx context.Context, disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) ([]StorageAPI, error) {
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
var mErrs = make([]error, len(disks)) var mErrs = make([]error, len(disks))
// Start writing `xl.json` to all disks in parallel. // Start writing `xl.json` to all disks in parallel.
@ -461,58 +461,22 @@ func writeUniqueXLMetadata(ctx context.Context, disks []StorageAPI, bucket, pref
continue continue
} }
wg.Add(1) wg.Add(1)
// Write `xl.json` in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
// Pick one xlMeta for a disk at index. // Pick one xlMeta for a disk at index.
xlMetas[index].Erasure.Index = index + 1 xlMetas[index].Erasure.Index = index + 1
// Write unique `xl.json` for a disk at index.
err := writeXLMetadata(ctx, disk, bucket, prefix, xlMetas[index])
if err != nil {
mErrs[index] = err
}
}(index, disk)
}
// Wait for all the routines.
wg.Wait()
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
return evalDisks(disks, mErrs), err
}
// writeSameXLMetadata - write `xl.json` on all disks in order.
func writeSameXLMetadata(ctx context.Context, disks []StorageAPI, bucket, prefix string, xlMeta xlMetaV1, writeQuorum int) ([]StorageAPI, error) {
var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(disks))
// Start writing `xl.json` to all disks in parallel.
for index, disk := range disks {
if disk == nil {
mErrs[index] = errDiskNotFound
continue
}
wg.Add(1)
// Write `xl.json` in a routine. // Write `xl.json` in a routine.
go func(index int, disk StorageAPI, metadata xlMetaV1) { go func(index int, disk StorageAPI, xlMeta xlMetaV1) {
defer wg.Done() defer wg.Done()
// Save the disk order index. // Write unique `xl.json` for a disk at index.
metadata.Erasure.Index = index + 1 mErrs[index] = writeXLMetadata(ctx, disk, bucket, prefix, xlMeta)
}(index, disk, xlMetas[index])
// Write xl metadata.
err := writeXLMetadata(ctx, disk, bucket, prefix, metadata)
if err != nil {
mErrs[index] = err
}
}(index, disk, xlMeta)
} }
// Wait for all the routines. // Wait for all the routines.
wg.Wait() wg.Wait()
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, writeQuorum) err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
return evalDisks(disks, mErrs), err return evalDisks(disks, mErrs), err
} }

@ -56,7 +56,7 @@ func (xl xlObjects) checkUploadIDExists(ctx context.Context, bucket, object, upl
// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket // Removes part given by partName belonging to a mulitpart upload from minioMetaBucket
func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) { func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) {
curpartPath := path.Join(bucket, object, uploadID, partName) curpartPath := path.Join(bucket, object, uploadID, partName)
wg := sync.WaitGroup{} var wg sync.WaitGroup
for i, disk := range xl.getDisks() { for i, disk := range xl.getDisks() {
if disk == nil { if disk == nil {
continue continue
@ -103,7 +103,7 @@ func (xl xlObjects) statPart(ctx context.Context, bucket, object, uploadID, part
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. // commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks.
func commitXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) ([]StorageAPI, error) { func commitXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) ([]StorageAPI, error) {
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
var mErrs = make([]error, len(disks)) var mErrs = make([]error, len(disks))
srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile) srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile)
@ -123,13 +123,7 @@ func commitXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcPre
defer disk.DeleteFile(srcBucket, srcPrefix) defer disk.DeleteFile(srcBucket, srcPrefix)
// Renames `xl.json` from source prefix to destination prefix. // Renames `xl.json` from source prefix to destination prefix.
rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) mErrs[index] = disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile)
if rErr != nil {
logger.LogIf(ctx, rErr)
mErrs[index] = rErr
return
}
mErrs[index] = nil
}(index, disk) }(index, disk)
} }
// Wait for all the routines. // Wait for all the routines.
@ -218,16 +212,23 @@ func (xl xlObjects) newMultipartUpload(ctx context.Context, bucket string, objec
// success. // success.
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempUploadIDPath, writeQuorum, false) defer xl.deleteObject(ctx, minioMetaTmpBucket, tempUploadIDPath, writeQuorum, false)
onlineDisks := xl.getDisks()
var partsMetadata = make([]xlMetaV1, len(onlineDisks))
for i := range onlineDisks {
partsMetadata[i] = xlMeta
}
var err error
// Write updated `xl.json` to all disks. // Write updated `xl.json` to all disks.
disks, err := writeSameXLMetadata(ctx, xl.getDisks(), minioMetaTmpBucket, tempUploadIDPath, xlMeta, writeQuorum) onlineDisks, err = writeUniqueXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, writeQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
} }
// Attempt to rename temp upload object to actual upload path object // Attempt to rename temp upload object to actual upload path object
_, rErr := rename(ctx, disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, true, writeQuorum, nil) _, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, true, writeQuorum, nil)
if rErr != nil { if err != nil {
return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
} }
// Return success. // Return success.
@ -456,7 +457,8 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempXLMetaPath, writeQuorum, false) defer xl.deleteObject(ctx, minioMetaTmpBucket, tempXLMetaPath, writeQuorum, false)
// Writes a unique `xl.json` each disk carrying new checksum related information. // Writes a unique `xl.json` each disk carrying new checksum related information.
if onlineDisks, err = writeUniqueXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, writeQuorum); err != nil { onlineDisks, err = writeUniqueXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, writeQuorum)
if err != nil {
return pi, toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath) return pi, toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath)
} }

@ -33,7 +33,7 @@ var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied)
// putObjectDir hints the bottom layer to create a new directory. // putObjectDir hints the bottom layer to create a new directory.
func (xl xlObjects) putObjectDir(ctx context.Context, bucket, object string, writeQuorum int) error { func (xl xlObjects) putObjectDir(ctx context.Context, bucket, object string, writeQuorum int) error {
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
errs := make([]error, len(xl.getDisks())) errs := make([]error, len(xl.getDisks()))
// Prepare object creation in all disks // Prepare object creation in all disks
@ -335,7 +335,7 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO
// getObjectInfoDir - This getObjectInfo is specific to object directory lookup. // getObjectInfoDir - This getObjectInfo is specific to object directory lookup.
func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) { func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) {
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
errs := make([]error, len(xl.getDisks())) errs := make([]error, len(xl.getDisks()))
// Prepare object creation in a all disks // Prepare object creation in a all disks
@ -423,7 +423,7 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o
} }
func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, errs []error) { func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, errs []error) {
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Undo rename object on disks where RenameFile succeeded. // Undo rename object on disks where RenameFile succeeded.
// If srcEntry/dstEntry are objects then add a trailing slash to copy // If srcEntry/dstEntry are objects then add a trailing slash to copy
@ -453,7 +453,7 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str
// the respective underlying storage layer representations. // the respective underlying storage layer representations.
func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, writeQuorum int, ignoredErr []error) ([]StorageAPI, error) { func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, writeQuorum int, ignoredErr []error) ([]StorageAPI, error) {
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Initialize list of errors. // Initialize list of errors.
var errs = make([]error, len(disks)) var errs = make([]error, len(disks))
@ -737,7 +737,7 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, wri
} }
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Initialize list of errors. // Initialize list of errors.
var dErrs = make([]error, len(disks)) var dErrs = make([]error, len(disks))

@ -174,14 +174,8 @@ func readXLMeta(ctx context.Context, disk StorageAPI, bucket string, object stri
if len(xlMetaBuf) == 0 { if len(xlMetaBuf) == 0 {
return xlMetaV1{}, errFileNotFound return xlMetaV1{}, errFileNotFound
} }
xlMeta, err = xlMetaV1UnmarshalJSON(ctx, xlMetaBuf)
if err != nil {
logger.GetReqInfo(ctx).AppendTags("disk", disk.String()) logger.GetReqInfo(ctx).AppendTags("disk", disk.String())
logger.LogIf(ctx, err) return xlMetaV1UnmarshalJSON(ctx, xlMetaBuf)
return xlMetaV1{}, err
}
// Return structured `xl.json`.
return xlMeta, nil
} }
// Reads all `xl.json` metadata as a xlMetaV1 slice. // Reads all `xl.json` metadata as a xlMetaV1 slice.
@ -189,7 +183,7 @@ func readXLMeta(ctx context.Context, disk StorageAPI, bucket string, object stri
func readAllXLMetadata(ctx context.Context, disks []StorageAPI, bucket, object string) ([]xlMetaV1, []error) { func readAllXLMetadata(ctx context.Context, disks []StorageAPI, bucket, object string) ([]xlMetaV1, []error) {
errs := make([]error, len(disks)) errs := make([]error, len(disks))
metadataArray := make([]xlMetaV1, len(disks)) metadataArray := make([]xlMetaV1, len(disks))
var wg = &sync.WaitGroup{} var wg sync.WaitGroup
// Read `xl.json` parallelly across disks. // Read `xl.json` parallelly across disks.
for index, disk := range disks { for index, disk := range disks {
if disk == nil { if disk == nil {
@ -200,12 +194,7 @@ func readAllXLMetadata(ctx context.Context, disks []StorageAPI, bucket, object s
// Read `xl.json` in routine. // Read `xl.json` in routine.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
var err error metadataArray[index], errs[index] = readXLMeta(ctx, disk, bucket, object)
metadataArray[index], err = readXLMeta(ctx, disk, bucket, object)
if err != nil {
errs[index] = err
return
}
}(index, disk) }(index, disk)
} }

@ -100,7 +100,7 @@ func GetHistoricLoad() Load {
// for the process currently // for the process currently
func GetLoad() Load { func GetLoad() Load {
vals := make(chan time.Duration, 3) vals := make(chan time.Duration, 3)
wg := sync.WaitGroup{} var wg sync.WaitGroup
for i := 0; i < cpuLoadSampleSize; i++ { for i := 0; i < cpuLoadSampleSize; i++ {
cpuCounter, err := newCounter() cpuCounter, err := newCounter()
if err != nil { if err != nil {

Loading…
Cancel
Save