XL/Multipart: maintain the parts info in multipart.json after complete-multipart-upload. (#1436)

master
Krishna Srinivas 9 years ago committed by Harshavardhana
parent 443ec37765
commit 0c27d8e5b1
  1. 2
      fs-objects-multipart.go
  2. 22
      xl-objects-multipart.go
  3. 62
      xl-objects.go

@ -65,7 +65,7 @@ func (fs fsObjects) listLeafEntries(prefixPath string) (entries []FileInfo, e er
return entries, nil return entries, nil
} }
// listMetaVolumeFiles - list all files at a given prefix inside minioMetaVolume. // listMetaVolumeFiles - list all files at a given prefix inside MetaVolume.
func (fs fsObjects) listMetaVolumeFiles(prefixPath string, markerPath string, recursive bool, maxKeys int) (allFileInfos []FileInfo, eof bool, err error) { func (fs fsObjects) listMetaVolumeFiles(prefixPath string, markerPath string, recursive bool, maxKeys int) (allFileInfos []FileInfo, eof bool, err error) {
// newMaxKeys tracks the size of entries which are going to be // newMaxKeys tracks the size of entries which are going to be
// returned back. // returned back.

@ -19,6 +19,7 @@ package main
import ( import (
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -274,6 +275,7 @@ func (xl xlObjects) NewMultipartUpload(bucket, object string) (string, error) {
} }
} }
} }
for { for {
uuid, err := uuid.New() uuid, err := uuid.New()
if err != nil { if err != nil {
@ -460,6 +462,26 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
md5Sums = append(md5Sums, part.ETag) md5Sums = append(md5Sums, part.ETag)
} }
if w, err := xl.storage.CreateFile(bucket, pathJoin(object, multipartMetaFile)); err == nil {
var b []byte
b, err = json.Marshal(parts)
if err != nil {
return "", err
}
_, err = w.Write(b)
if err != nil {
return "", err
}
// Close the writer.
if err = w.Close(); err != nil {
return "", err
}
} else {
return "", toObjectErr(err, bucket, object)
}
if err := xl.storage.DeleteFile(minioMetaVolume, path.Join(bucket, object, uploadID)); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Save the s3 md5. // Save the s3 md5.
s3MD5, err := makeS3MD5(md5Sums...) s3MD5, err := makeS3MD5(md5Sums...)
if err != nil { if err != nil {

@ -19,7 +19,7 @@ package main
import ( import (
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"errors" "encoding/json"
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
@ -72,25 +72,22 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
// GetObject - get an object. // GetObject - get an object.
func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, error) { func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, error) {
findPathOffset := func() (i int, partOffset int64, err error) { findPartOffset := func(parts completedParts) (partIndex int, partOffset int64, err error) {
partOffset = startOffset partOffset = startOffset
for i = 1; i < 10000; i++ { for i, part := range parts {
partIndex = i
var fileInfo FileInfo var fileInfo FileInfo
fileInfo, err = xl.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(i))) fileInfo, err = xl.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(part.PartNumber)))
if err != nil { if err != nil {
if err == errFileNotFound {
continue
}
return return
} }
if partOffset < fileInfo.Size { if partOffset < fileInfo.Size {
return return
} }
partOffset -= fileInfo.Size partOffset -= fileInfo.Size
} }
err = errors.New("offset too high") // Offset beyond the size of the object
err = errUnexpected
return return
} }
@ -114,17 +111,19 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
fileReader, fileWriter := io.Pipe() fileReader, fileWriter := io.Pipe()
partNum, offset, err := findPathOffset() parts, err := xl.getParts(bucket, object)
if err != nil { if err != nil {
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
go func() { partIndex, offset, err := findPartOffset(parts)
for ; partNum < 10000; partNum++ {
r, err := xl.storage.ReadFile(bucket, pathJoin(object, fmt.Sprint(partNum)), offset)
if err != nil { if err != nil {
if err == errFileNotFound { return nil, toObjectErr(err, bucket, object)
continue
} }
go func() {
for ; partIndex < len(parts); partIndex++ {
part := parts[partIndex]
r, err := xl.storage.ReadFile(bucket, pathJoin(object, fmt.Sprint(part.PartNumber)), offset)
if err != nil {
fileWriter.CloseWithError(err) fileWriter.CloseWithError(err)
return return
} }
@ -138,11 +137,32 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
return fileReader, nil return fileReader, nil
} }
// Return the parts of a multipart upload.
func (xl xlObjects) getParts(bucket, object string) (parts completedParts, err error) {
offset := int64(0)
r, err := xl.storage.ReadFile(bucket, pathJoin(object, multipartMetaFile), offset)
if err != nil {
return
}
// FIXME: what if multipart.json is > 4MB
b := make([]byte, 4*1024*1024)
n, err := io.ReadFull(r, b)
if err != nil && err != io.ErrUnexpectedEOF {
return
}
b = b[:n]
err = json.Unmarshal(b, &parts)
if err != nil {
return
}
return
}
// GetObjectInfo - get object info. // GetObjectInfo - get object info.
func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
getMultpartFileSize := func() (size int64) { getMultpartFileSize := func(parts completedParts) (size int64) {
for i := 0; i < 10000; i++ { for _, part := range parts {
fi, err := xl.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(i))) fi, err := xl.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(part.PartNumber)))
if err != nil { if err != nil {
continue continue
} }
@ -160,11 +180,11 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
} }
fi, err := xl.storage.StatFile(bucket, object) fi, err := xl.storage.StatFile(bucket, object)
if err != nil { if err != nil {
fi, err = xl.storage.StatFile(bucket, pathJoin(object, multipartMetaFile)) parts, err := xl.getParts(bucket, object)
if err != nil { if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
fi.Size = getMultpartFileSize() fi.Size = getMultpartFileSize(parts)
} }
contentType := "application/octet-stream" contentType := "application/octet-stream"
if objectExt := filepath.Ext(object); objectExt != "" { if objectExt := filepath.Ext(object); objectExt != "" {

Loading…
Cancel
Save