From 01ae5bb39c8f87cc3061325e89514572a633f1de Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Wed, 23 Nov 2016 16:34:04 +0530 Subject: [PATCH] FS/multipart: Fix append-parts to use minioMetaTmpBucket. (#3304) --- cmd/fs-v1-background-append.go | 41 ++++++++++++++++++---------------- cmd/fs-v1-multipart.go | 21 ++++++----------- 2 files changed, 29 insertions(+), 33 deletions(-) diff --git a/cmd/fs-v1-background-append.go b/cmd/fs-v1-background-append.go index c9efbfce2..c7cbc626d 100644 --- a/cmd/fs-v1-background-append.go +++ b/cmd/fs-v1-background-append.go @@ -49,9 +49,10 @@ type bgAppendPartsInput struct { // Identifies an appendParts go-routine. type bgAppendPartsInfo struct { - inputCh chan bgAppendPartsInput - timeoutCh chan struct{} // closed by appendParts go-routine when it timesout - endCh chan struct{} // closed after complete/abort of upload to end the appendParts go-routine + inputCh chan bgAppendPartsInput + timeoutCh chan struct{} // closed by appendParts go-routine when it timesout + abortCh chan struct{} // closed after abort of upload to end the appendParts go-routine + completeCh chan struct{} // closed after complete of upload to end the appendParts go-routine } // Called after a part is uploaded so that it can be appended in the background. @@ -63,9 +64,10 @@ func (b *backgroundAppend) append(disk StorageAPI, bucket, object, uploadID stri // part of a multipart upload is uploaded. inputCh := make(chan bgAppendPartsInput) timeoutCh := make(chan struct{}) - endCh := make(chan struct{}) + abortCh := make(chan struct{}) + completeCh := make(chan struct{}) - info = bgAppendPartsInfo{inputCh, timeoutCh, endCh} + info = bgAppendPartsInfo{inputCh, timeoutCh, abortCh, completeCh} b.infoMap[uploadID] = info go b.appendParts(disk, bucket, object, uploadID, info) @@ -92,6 +94,7 @@ func (b *backgroundAppend) append(disk StorageAPI, bucket, object, uploadID stri func (b *backgroundAppend) complete(disk StorageAPI, bucket, object, uploadID string, meta fsMetaV1) error { b.Lock() info, ok := b.infoMap[uploadID] + delete(b.infoMap, uploadID) b.Unlock() if !ok { return errPartsMissing @@ -106,13 +109,13 @@ func (b *backgroundAppend) complete(disk StorageAPI, bucket, object, uploadID st } err := <-errCh - b.remove(uploadID) + close(info.completeCh) return err } // Called after complete-multipart-upload or abort-multipart-upload so that the appendParts go-routine is not left dangling. -func (b *backgroundAppend) remove(uploadID string) { +func (b *backgroundAppend) abort(uploadID string) { b.Lock() defer b.Unlock() info, ok := b.infoMap[uploadID] @@ -120,7 +123,7 @@ func (b *backgroundAppend) remove(uploadID string) { return } delete(b.infoMap, uploadID) - close(info.endCh) + close(info.abortCh) } // This is run as a go-routine that appends the parts in the background. @@ -155,10 +158,12 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID } appendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size) } - case <-info.endCh: - // Either complete-multipart-upload or abort-multipart-upload closed endCh to end the appendParts go-routine. - appendFilePath := getFSAppendDataPath(uploadID) - disk.DeleteFile(bucket, appendFilePath) + case <-info.abortCh: + // abort-multipart-upload closed abortCh to end the appendParts go-routine. + disk.DeleteFile(minioMetaTmpBucket, uploadID) + return + case <-info.completeCh: + // complete-multipart-upload closed completeCh to end the appendParts go-routine. return case <-time.After(appendPartsTimeout): // Timeout the goroutine to garbage collect its resources. This would happen if the client initiates @@ -167,8 +172,7 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID delete(b.infoMap, uploadID) b.Unlock() // Delete the temporary append file as well. - appendFilePath := getFSAppendDataPath(uploadID) - disk.DeleteFile(bucket, appendFilePath) + disk.DeleteFile(minioMetaTmpBucket, uploadID) close(info.timeoutCh) } @@ -178,8 +182,7 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID // Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location // upon complete-multipart-upload. func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo) error { - partPath := pathJoin(bucket, object, uploadID, part.Name) - appendFilePath := getFSAppendDataPath(uploadID) + partPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID, part.Name) offset := int64(0) totalLeft := part.Size @@ -196,11 +199,11 @@ func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPar // the exact size of the file and hence know the size of buf[] // EOF/ErrUnexpectedEOF indicates that the length of file was shorter than part.Size and // hence considered as an error condition. - disk.DeleteFile(bucket, appendFilePath) + disk.DeleteFile(minioMetaTmpBucket, uploadID) return err } - if err = disk.AppendFile(minioMetaBucket, appendFilePath, buf[:n]); err != nil { - disk.DeleteFile(bucket, appendFilePath) + if err = disk.AppendFile(minioMetaTmpBucket, uploadID, buf[:n]); err != nil { + disk.DeleteFile(minioMetaTmpBucket, uploadID) return err } offset += n diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 0a7599a9d..d8543b189 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -285,11 +285,6 @@ func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo, return fsMeta.Parts[nextPartIndex], true } -// Returns path for the append-file. -func getFSAppendDataPath(uploadID string) string { - return path.Join(minioMetaTmpBucket, uploadID) -} - // PutObjectPart - reads incoming data until EOF for the part file on // an ongoing multipart transaction. Internally incoming data is // written to '.minio.sys/tmp' location and safely renamed to @@ -566,21 +561,20 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(err, minioMetaMultipartBucket, fsMetaPath) } - appendFallback := true // In case background appendRoutine() did not append the required parts. + appendFallback := true // In case background-append did not append the required parts. if isPartsSame(fsMeta.Parts, parts) { err = fs.bgAppend.complete(fs.storage, bucket, object, uploadID, fsMeta) if err == nil { appendFallback = false - fsAppendDataPath := getFSAppendDataPath(uploadID) - if err = fs.storage.RenameFile(minioMetaBucket, fsAppendDataPath, bucket, object); err != nil { - return "", toObjectErr(traceError(err), minioMetaBucket, fsAppendDataPath) + if err = fs.storage.RenameFile(minioMetaTmpBucket, uploadID, bucket, object); err != nil { + return "", toObjectErr(traceError(err), minioMetaTmpBucket, uploadID) } } } if appendFallback { - // appendRoutine could not do append all the required parts, hence we do it here. - tempObj := path.Join(minioMetaTmpBucket, uploadID+"-"+"part.1") + // background append could not do append all the required parts, hence we do it here. + tempObj := uploadID + "-" + "part.1" // Allocate staging buffer. var buf = make([]byte, readSizeV1) @@ -702,9 +696,8 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error if err := cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil { return err } - fs.bgAppend.remove(uploadID) - - // remove upload ID in uploads.json + fs.bgAppend.abort(uploadID) + // remove entry from uploads.json with quorum if err := fs.removeUploadID(bucket, object, uploadID); err != nil { return toObjectErr(err, bucket, object) }