fix: make sure to preserve metadata during overwrite in FS mode (#10512)

This bug was introduced in 14f0047295
almost 3yrs ago, as a side affect of removing stale `fs.json`
but we in-fact end up removing existing good `fs.json` for an
existing object, leading to some form of a data loss.

fixes #10496
master
Harshavardhana 4 years ago committed by GitHub
parent dff37aa33d
commit 84bf4624a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      cmd/fs-v1-multipart.go
  2. 24
      cmd/fs-v1.go

@ -19,6 +19,7 @@ package cmd
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
@ -708,13 +709,35 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
return oi, err return oi, err
} }
defer destLock.Unlock() defer destLock.Unlock()
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
metaFile, err := fs.rwPool.Create(fsMetaPath) bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
metaFile, err := fs.rwPool.Write(fsMetaPath)
var freshFile bool
if err != nil { if err != nil {
logger.LogIf(ctx, err) if !errors.Is(err, errFileNotFound) {
return oi, toObjectErr(err, bucket, object) logger.LogIf(ctx, err)
return oi, toObjectErr(err, bucket, object)
}
metaFile, err = fs.rwPool.Create(fsMetaPath)
if err != nil {
logger.LogIf(ctx, err)
return oi, toObjectErr(err, bucket, object)
}
freshFile = true
} }
defer metaFile.Close() defer metaFile.Close()
defer func() {
// Remove meta file when CompleteMultipart encounters
// any error and it is a fresh file.
//
// We should preserve the `fs.json` of any
// existing object
if e != nil && freshFile {
tmpDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID)
fsRemoveMeta(ctx, bucketMetaDir, fsMetaPath, tmpDir)
}
}()
// Read saved fs metadata for ongoing multipart. // Read saved fs metadata for ongoing multipart.
fsMetaBuf, err := ioutil.ReadFile(pathJoin(uploadIDDir, fs.metaJSONFile)) fsMetaBuf, err := ioutil.ReadFile(pathJoin(uploadIDDir, fs.metaJSONFile))

@ -1173,18 +1173,30 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
var wlk *lock.LockedFile var wlk *lock.LockedFile
if bucket != minioMetaBucket { if bucket != minioMetaBucket {
bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix) bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile) fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
wlk, err = fs.rwPool.Create(fsMetaPath) wlk, err = fs.rwPool.Write(fsMetaPath)
var freshFile bool
if err != nil { if err != nil {
logger.LogIf(ctx, err) if !errors.Is(err, errFileNotFound) {
return ObjectInfo{}, toObjectErr(err, bucket, object) logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
wlk, err = fs.rwPool.Create(fsMetaPath)
if err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
freshFile = true
} }
// This close will allow for locks to be synchronized on `fs.json`. // This close will allow for locks to be synchronized on `fs.json`.
defer wlk.Close() defer wlk.Close()
defer func() { defer func() {
// Remove meta file when PutObject encounters any error // Remove meta file when PutObject encounters
if retErr != nil { // any error and it is a fresh file.
//
// We should preserve the `fs.json` of any
// existing object
if retErr != nil && freshFile {
tmpDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID) tmpDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID)
fsRemoveMeta(ctx, bucketMetaDir, fsMetaPath, tmpDir) fsRemoveMeta(ctx, bucketMetaDir, fsMetaPath, tmpDir)
} }

Loading…
Cancel
Save