You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
702 lines
22 KiB
702 lines
22 KiB
/*
|
|
* Minio Cloud Storage, (C) 2015 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package fs
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/xml"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/minio/minio/pkg/atomic"
|
|
"github.com/minio/minio/pkg/crypto/sha256"
|
|
"github.com/minio/minio/pkg/crypto/sha512"
|
|
"github.com/minio/minio/pkg/disk"
|
|
"github.com/minio/minio/pkg/mimedb"
|
|
"github.com/minio/minio/pkg/probe"
|
|
"github.com/minio/minio/pkg/s3/signature4"
|
|
)
|
|
|
|
// isValidUploadID - is upload id.
|
|
func (fs Filesystem) isValidUploadID(object, uploadID string) (ok bool) {
|
|
fs.rwLock.RLock()
|
|
defer fs.rwLock.RUnlock()
|
|
_, ok = fs.multiparts.ActiveSession[uploadID]
|
|
if !ok {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// byObjectMetadataKey is a sortable interface for UploadMetadata slice
|
|
type byUploadMetadataKey []*UploadMetadata
|
|
|
|
func (b byUploadMetadataKey) Len() int { return len(b) }
|
|
func (b byUploadMetadataKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
|
func (b byUploadMetadataKey) Less(i, j int) bool { return b[i].Object < b[j].Object }
|
|
|
|
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
|
|
func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) {
|
|
// Input validation.
|
|
if !IsValidBucketName(bucket) {
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e := os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(e)
|
|
}
|
|
var uploads []*UploadMetadata
|
|
fs.rwLock.RLock()
|
|
defer fs.rwLock.RUnlock()
|
|
for uploadID, session := range fs.multiparts.ActiveSession {
|
|
objectName := session.ObjectName
|
|
if strings.HasPrefix(objectName, resources.Prefix) {
|
|
if len(uploads) > resources.MaxUploads {
|
|
sort.Sort(byUploadMetadataKey(uploads))
|
|
resources.Upload = uploads
|
|
resources.NextKeyMarker = session.ObjectName
|
|
resources.NextUploadIDMarker = uploadID
|
|
resources.IsTruncated = true
|
|
return resources, nil
|
|
}
|
|
// UploadIDMarker is ignored if KeyMarker is empty.
|
|
switch {
|
|
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
|
|
if objectName > resources.KeyMarker {
|
|
upload := new(UploadMetadata)
|
|
upload.Object = objectName
|
|
upload.UploadID = uploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
|
|
if session.UploadID > resources.UploadIDMarker {
|
|
if objectName >= resources.KeyMarker {
|
|
upload := new(UploadMetadata)
|
|
upload.Object = objectName
|
|
upload.UploadID = uploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
}
|
|
default:
|
|
upload := new(UploadMetadata)
|
|
upload.Object = objectName
|
|
upload.UploadID = uploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
}
|
|
}
|
|
sort.Sort(byUploadMetadataKey(uploads))
|
|
resources.Upload = uploads
|
|
return resources, nil
|
|
}
|
|
|
|
// verify if parts sent over the network do really match with what we
|
|
// have for the session.
|
|
func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool {
|
|
if parts == nil || savedParts == nil {
|
|
return false
|
|
}
|
|
if len(parts) != len(savedParts) {
|
|
return false
|
|
}
|
|
// Range of incoming parts and compare them with saved parts.
|
|
for i, part := range parts {
|
|
if strings.Trim(part.ETag, "\"") != savedParts[i].ETag {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Create an s3 compatible MD5sum for complete multipart transaction.
|
|
func makeS3MD5(md5Strs ...string) (string, *probe.Error) {
|
|
var finalMD5Bytes []byte
|
|
for _, md5Str := range md5Strs {
|
|
md5Bytes, e := hex.DecodeString(md5Str)
|
|
if e != nil {
|
|
return "", probe.NewError(e)
|
|
}
|
|
finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
|
|
}
|
|
md5Hasher := md5.New()
|
|
md5Hasher.Write(finalMD5Bytes)
|
|
s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs))
|
|
return s3MD5, nil
|
|
}
|
|
|
|
type multiCloser struct {
|
|
Closers []io.Closer
|
|
}
|
|
|
|
func (m multiCloser) Close() error {
|
|
for _, c := range m.Closers {
|
|
if e := c.Close(); e != nil {
|
|
return e
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MultiCloser - returns a Closer that's the logical
|
|
// concatenation of the provided input closers. They're closed
|
|
// sequentially. If any of the closers return a non-nil error, Close
|
|
// will return that error.
|
|
func MultiCloser(closers ...io.Closer) io.Closer {
|
|
return multiCloser{closers}
|
|
}
|
|
|
|
// removeParts - remove all parts.
|
|
func removeParts(partPathPrefix string, parts []PartMetadata) *probe.Error {
|
|
for _, part := range parts {
|
|
// We are on purpose ignoring the return values here, since
|
|
// another thread would have purged these entries.
|
|
os.Remove(partPathPrefix + part.ETag + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// saveParts - concantenate and save all parts.
|
|
func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe.Error {
|
|
var partReaders []io.Reader
|
|
var partClosers []io.Closer
|
|
for _, part := range parts {
|
|
// Trim prefix
|
|
md5Sum := strings.TrimPrefix(part.ETag, "\"")
|
|
// Trim suffix
|
|
md5Sum = strings.TrimSuffix(md5Sum, "\"")
|
|
partFile, e := os.OpenFile(partPathPrefix+md5Sum+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
|
|
if e != nil {
|
|
if !os.IsNotExist(e) {
|
|
return probe.NewError(e)
|
|
}
|
|
// Some clients do not set Content-Md5, so we would have
|
|
// created part files without 'ETag' in them.
|
|
partFile, e = os.OpenFile(partPathPrefix+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
|
|
if e != nil {
|
|
return probe.NewError(e)
|
|
}
|
|
}
|
|
partReaders = append(partReaders, partFile)
|
|
partClosers = append(partClosers, partFile)
|
|
}
|
|
// Concatenate a list of closers and close upon return.
|
|
closer := MultiCloser(partClosers...)
|
|
defer closer.Close()
|
|
|
|
reader := io.MultiReader(partReaders...)
|
|
readBufferSize := 8 * 1024 * 1024 // 8MiB
|
|
readBuffer := make([]byte, readBufferSize) // Allocate 8MiB buffer.
|
|
if _, e := io.CopyBuffer(mw, reader, readBuffer); e != nil {
|
|
return probe.NewError(e)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NewMultipartUpload - initiate a new multipart session
|
|
func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
|
|
di, e := disk.GetInfo(fs.path)
|
|
if e != nil {
|
|
return "", probe.NewError(e)
|
|
}
|
|
|
|
// Remove 5% from total space for cumulative disk space used for
|
|
// journalling, inodes etc.
|
|
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
|
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
|
return "", probe.NewError(RootPathFull{Path: fs.path})
|
|
}
|
|
|
|
// Input validation.
|
|
if !IsValidBucketName(bucket) {
|
|
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
if !IsValidObjectName(object) {
|
|
return "", probe.NewError(ObjectNameInvalid{Object: object})
|
|
}
|
|
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e = os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return "", probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return "", probe.NewError(e)
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
objectDir := filepath.Dir(objectPath)
|
|
if _, e = os.Stat(objectDir); e != nil {
|
|
if !os.IsNotExist(e) {
|
|
return "", probe.NewError(e)
|
|
}
|
|
e = os.MkdirAll(objectDir, 0700)
|
|
if e != nil {
|
|
return "", probe.NewError(e)
|
|
}
|
|
}
|
|
|
|
// Generate new upload id.
|
|
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + object + time.Now().String())
|
|
uploadIDSum := sha512.Sum512(id)
|
|
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
|
|
|
|
// Critical region requiring write lock.
|
|
fs.rwLock.Lock()
|
|
defer fs.rwLock.Unlock()
|
|
// Initialize multipart session.
|
|
mpartSession := &MultipartSession{}
|
|
mpartSession.TotalParts = 0
|
|
mpartSession.ObjectName = object
|
|
mpartSession.UploadID = uploadID
|
|
mpartSession.Initiated = time.Now().UTC()
|
|
// Multipart has maximum of 10000 parts.
|
|
var parts []PartMetadata
|
|
mpartSession.Parts = parts
|
|
|
|
fs.multiparts.ActiveSession[uploadID] = mpartSession
|
|
if err := saveMultipartsSession(*fs.multiparts); err != nil {
|
|
return "", err.Trace(objectPath)
|
|
}
|
|
return uploadID, nil
|
|
}
|
|
|
|
// Remove all duplicated parts based on the latest time of their upload.
|
|
func removeDuplicateParts(parts []PartMetadata) []PartMetadata {
|
|
length := len(parts) - 1
|
|
for i := 0; i < length; i++ {
|
|
for j := i + 1; j <= length; j++ {
|
|
if parts[i].PartNumber == parts[j].PartNumber {
|
|
if parts[i].LastModified.Sub(parts[j].LastModified) > 0 {
|
|
parts[i] = parts[length]
|
|
} else {
|
|
parts[j] = parts[length]
|
|
}
|
|
parts = parts[0:length]
|
|
length--
|
|
j--
|
|
}
|
|
}
|
|
}
|
|
return parts
|
|
}
|
|
|
|
// partNumber is a sortable interface for Part slice.
|
|
type partNumber []PartMetadata
|
|
|
|
func (a partNumber) Len() int { return len(a) }
|
|
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
|
|
|
|
// CreateObjectPart - create a part in a multipart session
|
|
func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *signature4.Sign) (string, *probe.Error) {
|
|
di, err := disk.GetInfo(fs.path)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
|
|
// Remove 5% from total space for cumulative disk space used for
|
|
// journalling, inodes etc.
|
|
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
|
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
|
return "", probe.NewError(RootPathFull{Path: fs.path})
|
|
}
|
|
|
|
// Part id cannot be negative.
|
|
if partID <= 0 {
|
|
return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero"))
|
|
}
|
|
|
|
// Check bucket name valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// Verify object path legal.
|
|
if !IsValidObjectName(object) {
|
|
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
// Verify upload is valid for the incoming object.
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
if expectedMD5Sum != "" {
|
|
var expectedMD5SumBytes []byte
|
|
expectedMD5SumBytes, err = base64.StdEncoding.DecodeString(expectedMD5Sum)
|
|
if err != nil {
|
|
// Pro-actively close the connection
|
|
return "", probe.NewError(InvalidDigest{MD5: expectedMD5Sum})
|
|
}
|
|
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
|
}
|
|
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, err = os.Stat(bucketPath); err != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(err) {
|
|
return "", probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return "", probe.NewError(err)
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
partPathPrefix := objectPath + uploadID
|
|
partPath := partPathPrefix + expectedMD5Sum + fmt.Sprintf("$%d-$multiparts", partID)
|
|
partFile, e := atomic.FileCreateWithPrefix(partPath, "$multiparts")
|
|
if e != nil {
|
|
return "", probe.NewError(e)
|
|
}
|
|
|
|
md5Hasher := md5.New()
|
|
sha256Hasher := sha256.New()
|
|
partWriter := io.MultiWriter(partFile, md5Hasher, sha256Hasher)
|
|
if _, e = io.CopyN(partWriter, data, size); e != nil {
|
|
partFile.CloseAndPurge()
|
|
return "", probe.NewError(e)
|
|
}
|
|
|
|
md5sum := hex.EncodeToString(md5Hasher.Sum(nil))
|
|
// Verify if the written object is equal to what is expected, only
|
|
// if it is requested as such.
|
|
if expectedMD5Sum != "" {
|
|
if !isMD5SumEqual(expectedMD5Sum, md5sum) {
|
|
partFile.CloseAndPurge()
|
|
return "", probe.NewError(BadDigest{MD5: expectedMD5Sum, Bucket: bucket, Object: object})
|
|
}
|
|
}
|
|
if signature != nil {
|
|
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sha256Hasher.Sum(nil)))
|
|
if err != nil {
|
|
partFile.CloseAndPurge()
|
|
return "", err.Trace()
|
|
}
|
|
if !ok {
|
|
partFile.CloseAndPurge()
|
|
return "", probe.NewError(SignDoesNotMatch{})
|
|
}
|
|
}
|
|
partFile.Close()
|
|
|
|
fi, e := os.Stat(partPath)
|
|
if e != nil {
|
|
return "", probe.NewError(e)
|
|
}
|
|
partMetadata := PartMetadata{}
|
|
partMetadata.ETag = md5sum
|
|
partMetadata.PartNumber = partID
|
|
partMetadata.Size = fi.Size()
|
|
partMetadata.LastModified = fi.ModTime()
|
|
|
|
// Critical region requiring read lock.
|
|
fs.rwLock.RLock()
|
|
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
|
|
fs.rwLock.RUnlock()
|
|
if !ok {
|
|
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
// Add all incoming parts.
|
|
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, partMetadata)
|
|
|
|
// Remove duplicate parts based on the most recent uploaded.
|
|
deserializedMultipartSession.Parts = removeDuplicateParts(deserializedMultipartSession.Parts)
|
|
// Save total parts uploaded.
|
|
deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts)
|
|
|
|
// Sort by part number before saving.
|
|
sort.Sort(partNumber(deserializedMultipartSession.Parts))
|
|
|
|
// Critical region requiring write lock.
|
|
fs.rwLock.Lock()
|
|
defer fs.rwLock.Unlock()
|
|
|
|
fs.multiparts.ActiveSession[uploadID] = deserializedMultipartSession
|
|
if err := saveMultipartsSession(*fs.multiparts); err != nil {
|
|
return "", err.Trace(partPathPrefix)
|
|
}
|
|
|
|
// Return etag.
|
|
return partMetadata.ETag, nil
|
|
}
|
|
|
|
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
|
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *signature4.Sign) (ObjectMetadata, *probe.Error) {
|
|
// Check bucket name is valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// Verify object path is legal.
|
|
if !IsValidObjectName(object) {
|
|
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
// Verify if valid upload for incoming object.
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return ObjectMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e := os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return ObjectMetadata{}, probe.NewError(InternalError{})
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
objectWriter, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject")
|
|
if e != nil {
|
|
return ObjectMetadata{}, probe.NewError(e)
|
|
}
|
|
|
|
partBytes, e := ioutil.ReadAll(data)
|
|
if e != nil {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(e)
|
|
}
|
|
if signature != nil {
|
|
sh := sha256.New()
|
|
sh.Write(partBytes)
|
|
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil)))
|
|
if err != nil {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectMetadata{}, err.Trace()
|
|
}
|
|
if !ok {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(SignDoesNotMatch{})
|
|
}
|
|
}
|
|
completeMultipartUpload := &CompleteMultipartUpload{}
|
|
if e = xml.Unmarshal(partBytes, completeMultipartUpload); e != nil {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(MalformedXML{})
|
|
}
|
|
if !sort.IsSorted(completedParts(completeMultipartUpload.Part)) {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(InvalidPartOrder{})
|
|
}
|
|
|
|
// Save parts for verification.
|
|
parts := completeMultipartUpload.Part
|
|
|
|
// Critical region requiring read lock.
|
|
fs.rwLock.RLock()
|
|
savedParts := fs.multiparts.ActiveSession[uploadID].Parts
|
|
fs.rwLock.RUnlock()
|
|
|
|
if !doPartsMatch(parts, savedParts) {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(InvalidPart{})
|
|
}
|
|
|
|
// Parts successfully validated, save all the parts.
|
|
partPathPrefix := objectPath + uploadID
|
|
if err := saveParts(partPathPrefix, objectWriter, parts); err != nil {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectMetadata{}, err.Trace(partPathPrefix)
|
|
}
|
|
var md5Strs []string
|
|
for _, part := range savedParts {
|
|
md5Strs = append(md5Strs, part.ETag)
|
|
}
|
|
// Save the s3 md5.
|
|
s3MD5, err := makeS3MD5(md5Strs...)
|
|
if err != nil {
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectMetadata{}, err.Trace(md5Strs...)
|
|
}
|
|
|
|
// Successfully saved multipart, remove all parts in a routine.
|
|
go removeParts(partPathPrefix, savedParts)
|
|
|
|
// Critical region requiring write lock.
|
|
fs.rwLock.Lock()
|
|
delete(fs.multiparts.ActiveSession, uploadID)
|
|
if err := saveMultipartsSession(*fs.multiparts); err != nil {
|
|
fs.rwLock.Unlock()
|
|
objectWriter.CloseAndPurge()
|
|
return ObjectMetadata{}, err.Trace(partPathPrefix)
|
|
}
|
|
if e = objectWriter.Close(); e != nil {
|
|
fs.rwLock.Unlock()
|
|
return ObjectMetadata{}, probe.NewError(e)
|
|
}
|
|
fs.rwLock.Unlock()
|
|
|
|
// Send stat again to get object metadata.
|
|
st, e := os.Stat(objectPath)
|
|
if e != nil {
|
|
return ObjectMetadata{}, probe.NewError(e)
|
|
}
|
|
|
|
contentType := "application/octet-stream"
|
|
if objectExt := filepath.Ext(objectPath); objectExt != "" {
|
|
content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]
|
|
if ok {
|
|
contentType = content.ContentType
|
|
}
|
|
}
|
|
newObject := ObjectMetadata{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
LastModified: st.ModTime(),
|
|
Size: st.Size(),
|
|
ContentType: contentType,
|
|
MD5: s3MD5,
|
|
}
|
|
return newObject, nil
|
|
}
|
|
|
|
// ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata
|
|
func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) {
|
|
// Check bucket name is valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// Verify object path legal.
|
|
if !IsValidObjectName(object) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
// Save upload id.
|
|
uploadID := resources.UploadID
|
|
|
|
// Verify if upload id is valid for incoming object.
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
objectResourcesMetadata := resources
|
|
objectResourcesMetadata.Bucket = bucket
|
|
objectResourcesMetadata.Object = object
|
|
var startPartNumber int
|
|
switch {
|
|
case objectResourcesMetadata.PartNumberMarker == 0:
|
|
startPartNumber = 1
|
|
default:
|
|
startPartNumber = objectResourcesMetadata.PartNumberMarker
|
|
}
|
|
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e := os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return ObjectResourcesMetadata{}, probe.NewError(e)
|
|
}
|
|
|
|
// Critical region requiring read lock.
|
|
fs.rwLock.RLock()
|
|
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
|
|
fs.rwLock.RUnlock()
|
|
if !ok {
|
|
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
|
|
}
|
|
var parts []PartMetadata
|
|
for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ {
|
|
if len(parts) > objectResourcesMetadata.MaxParts {
|
|
sort.Sort(partNumber(parts))
|
|
objectResourcesMetadata.IsTruncated = true
|
|
objectResourcesMetadata.Part = parts
|
|
objectResourcesMetadata.NextPartNumberMarker = i
|
|
return objectResourcesMetadata, nil
|
|
}
|
|
parts = append(parts, deserializedMultipartSession.Parts[i-1])
|
|
}
|
|
sort.Sort(partNumber(parts))
|
|
objectResourcesMetadata.Part = parts
|
|
return objectResourcesMetadata, nil
|
|
}
|
|
|
|
// AbortMultipartUpload - abort an incomplete multipart session
|
|
func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
|
|
// Check bucket name valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// Verify object path legal.
|
|
if !IsValidObjectName(object) {
|
|
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
bucket = fs.denormalizeBucket(bucket)
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, e := os.Stat(bucketPath); e != nil {
|
|
// Check bucket exists.
|
|
if os.IsNotExist(e) {
|
|
return probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return probe.NewError(e)
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
partPathPrefix := objectPath + uploadID
|
|
|
|
// Critical region requiring read lock.
|
|
fs.rwLock.RLock()
|
|
savedParts := fs.multiparts.ActiveSession[uploadID].Parts
|
|
fs.rwLock.RUnlock()
|
|
|
|
// Remove all parts.
|
|
if err := removeParts(partPathPrefix, savedParts); err != nil {
|
|
return err.Trace(partPathPrefix)
|
|
}
|
|
|
|
// Critical region requiring write lock.
|
|
fs.rwLock.Lock()
|
|
defer fs.rwLock.Unlock()
|
|
|
|
delete(fs.multiparts.ActiveSession, uploadID)
|
|
if err := saveMultipartsSession(*fs.multiparts); err != nil {
|
|
return err.Trace(partPathPrefix)
|
|
}
|
|
return nil
|
|
}
|
|
|