FS/multipart: Fix append-parts to use minioMetaTmpBucket. (#3304)

master
Krishna Srinivas 8 years ago committed by Harshavardhana
parent 11faf3f16d
commit 01ae5bb39c
  1. 41
      cmd/fs-v1-background-append.go
  2. 21
      cmd/fs-v1-multipart.go

@ -49,9 +49,10 @@ type bgAppendPartsInput struct {
// Identifies an appendParts go-routine. // Identifies an appendParts go-routine.
type bgAppendPartsInfo struct { type bgAppendPartsInfo struct {
inputCh chan bgAppendPartsInput inputCh chan bgAppendPartsInput
timeoutCh chan struct{} // closed by appendParts go-routine when it timesout timeoutCh chan struct{} // closed by appendParts go-routine when it timesout
endCh chan struct{} // closed after complete/abort of upload to end the appendParts go-routine abortCh chan struct{} // closed after abort of upload to end the appendParts go-routine
completeCh chan struct{} // closed after complete of upload to end the appendParts go-routine
} }
// Called after a part is uploaded so that it can be appended in the background. // Called after a part is uploaded so that it can be appended in the background.
@ -63,9 +64,10 @@ func (b *backgroundAppend) append(disk StorageAPI, bucket, object, uploadID stri
// part of a multipart upload is uploaded. // part of a multipart upload is uploaded.
inputCh := make(chan bgAppendPartsInput) inputCh := make(chan bgAppendPartsInput)
timeoutCh := make(chan struct{}) timeoutCh := make(chan struct{})
endCh := make(chan struct{}) abortCh := make(chan struct{})
completeCh := make(chan struct{})
info = bgAppendPartsInfo{inputCh, timeoutCh, endCh} info = bgAppendPartsInfo{inputCh, timeoutCh, abortCh, completeCh}
b.infoMap[uploadID] = info b.infoMap[uploadID] = info
go b.appendParts(disk, bucket, object, uploadID, info) go b.appendParts(disk, bucket, object, uploadID, info)
@ -92,6 +94,7 @@ func (b *backgroundAppend) append(disk StorageAPI, bucket, object, uploadID stri
func (b *backgroundAppend) complete(disk StorageAPI, bucket, object, uploadID string, meta fsMetaV1) error { func (b *backgroundAppend) complete(disk StorageAPI, bucket, object, uploadID string, meta fsMetaV1) error {
b.Lock() b.Lock()
info, ok := b.infoMap[uploadID] info, ok := b.infoMap[uploadID]
delete(b.infoMap, uploadID)
b.Unlock() b.Unlock()
if !ok { if !ok {
return errPartsMissing return errPartsMissing
@ -106,13 +109,13 @@ func (b *backgroundAppend) complete(disk StorageAPI, bucket, object, uploadID st
} }
err := <-errCh err := <-errCh
b.remove(uploadID) close(info.completeCh)
return err return err
} }
// Called after complete-multipart-upload or abort-multipart-upload so that the appendParts go-routine is not left dangling. // Called after complete-multipart-upload or abort-multipart-upload so that the appendParts go-routine is not left dangling.
func (b *backgroundAppend) remove(uploadID string) { func (b *backgroundAppend) abort(uploadID string) {
b.Lock() b.Lock()
defer b.Unlock() defer b.Unlock()
info, ok := b.infoMap[uploadID] info, ok := b.infoMap[uploadID]
@ -120,7 +123,7 @@ func (b *backgroundAppend) remove(uploadID string) {
return return
} }
delete(b.infoMap, uploadID) delete(b.infoMap, uploadID)
close(info.endCh) close(info.abortCh)
} }
// This is run as a go-routine that appends the parts in the background. // This is run as a go-routine that appends the parts in the background.
@ -155,10 +158,12 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID
} }
appendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size) appendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size)
} }
case <-info.endCh: case <-info.abortCh:
// Either complete-multipart-upload or abort-multipart-upload closed endCh to end the appendParts go-routine. // abort-multipart-upload closed abortCh to end the appendParts go-routine.
appendFilePath := getFSAppendDataPath(uploadID) disk.DeleteFile(minioMetaTmpBucket, uploadID)
disk.DeleteFile(bucket, appendFilePath) return
case <-info.completeCh:
// complete-multipart-upload closed completeCh to end the appendParts go-routine.
return return
case <-time.After(appendPartsTimeout): case <-time.After(appendPartsTimeout):
// Timeout the goroutine to garbage collect its resources. This would happen if the client initiates // Timeout the goroutine to garbage collect its resources. This would happen if the client initiates
@ -167,8 +172,7 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID
delete(b.infoMap, uploadID) delete(b.infoMap, uploadID)
b.Unlock() b.Unlock()
// Delete the temporary append file as well. // Delete the temporary append file as well.
appendFilePath := getFSAppendDataPath(uploadID) disk.DeleteFile(minioMetaTmpBucket, uploadID)
disk.DeleteFile(bucket, appendFilePath)
close(info.timeoutCh) close(info.timeoutCh)
} }
@ -178,8 +182,7 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID
// Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location // Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location
// upon complete-multipart-upload. // upon complete-multipart-upload.
func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo) error { func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo) error {
partPath := pathJoin(bucket, object, uploadID, part.Name) partPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID, part.Name)
appendFilePath := getFSAppendDataPath(uploadID)
offset := int64(0) offset := int64(0)
totalLeft := part.Size totalLeft := part.Size
@ -196,11 +199,11 @@ func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPar
// the exact size of the file and hence know the size of buf[] // the exact size of the file and hence know the size of buf[]
// EOF/ErrUnexpectedEOF indicates that the length of file was shorter than part.Size and // EOF/ErrUnexpectedEOF indicates that the length of file was shorter than part.Size and
// hence considered as an error condition. // hence considered as an error condition.
disk.DeleteFile(bucket, appendFilePath) disk.DeleteFile(minioMetaTmpBucket, uploadID)
return err return err
} }
if err = disk.AppendFile(minioMetaBucket, appendFilePath, buf[:n]); err != nil { if err = disk.AppendFile(minioMetaTmpBucket, uploadID, buf[:n]); err != nil {
disk.DeleteFile(bucket, appendFilePath) disk.DeleteFile(minioMetaTmpBucket, uploadID)
return err return err
} }
offset += n offset += n

@ -285,11 +285,6 @@ func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo,
return fsMeta.Parts[nextPartIndex], true return fsMeta.Parts[nextPartIndex], true
} }
// Returns path for the append-file.
func getFSAppendDataPath(uploadID string) string {
return path.Join(minioMetaTmpBucket, uploadID)
}
// PutObjectPart - reads incoming data until EOF for the part file on // PutObjectPart - reads incoming data until EOF for the part file on
// an ongoing multipart transaction. Internally incoming data is // an ongoing multipart transaction. Internally incoming data is
// written to '.minio.sys/tmp' location and safely renamed to // written to '.minio.sys/tmp' location and safely renamed to
@ -566,21 +561,20 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", toObjectErr(err, minioMetaMultipartBucket, fsMetaPath) return "", toObjectErr(err, minioMetaMultipartBucket, fsMetaPath)
} }
appendFallback := true // In case background appendRoutine() did not append the required parts. appendFallback := true // In case background-append did not append the required parts.
if isPartsSame(fsMeta.Parts, parts) { if isPartsSame(fsMeta.Parts, parts) {
err = fs.bgAppend.complete(fs.storage, bucket, object, uploadID, fsMeta) err = fs.bgAppend.complete(fs.storage, bucket, object, uploadID, fsMeta)
if err == nil { if err == nil {
appendFallback = false appendFallback = false
fsAppendDataPath := getFSAppendDataPath(uploadID) if err = fs.storage.RenameFile(minioMetaTmpBucket, uploadID, bucket, object); err != nil {
if err = fs.storage.RenameFile(minioMetaBucket, fsAppendDataPath, bucket, object); err != nil { return "", toObjectErr(traceError(err), minioMetaTmpBucket, uploadID)
return "", toObjectErr(traceError(err), minioMetaBucket, fsAppendDataPath)
} }
} }
} }
if appendFallback { if appendFallback {
// appendRoutine could not do append all the required parts, hence we do it here. // background append could not do append all the required parts, hence we do it here.
tempObj := path.Join(minioMetaTmpBucket, uploadID+"-"+"part.1") tempObj := uploadID + "-" + "part.1"
// Allocate staging buffer. // Allocate staging buffer.
var buf = make([]byte, readSizeV1) var buf = make([]byte, readSizeV1)
@ -702,9 +696,8 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error
if err := cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil { if err := cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil {
return err return err
} }
fs.bgAppend.remove(uploadID) fs.bgAppend.abort(uploadID)
// remove entry from uploads.json with quorum
// remove upload ID in uploads.json
if err := fs.removeUploadID(bucket, object, uploadID); err != nil { if err := fs.removeUploadID(bucket, object, uploadID); err != nil {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }

Loading…
Cancel
Save