You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
554 lines
17 KiB
554 lines
17 KiB
/*
|
|
* Minio Cloud Storage, (C) 2015 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package fs
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"encoding/xml"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/minio/minio-xl/pkg/atomic"
|
|
"github.com/minio/minio-xl/pkg/crypto/sha256"
|
|
"github.com/minio/minio-xl/pkg/crypto/sha512"
|
|
"github.com/minio/minio-xl/pkg/probe"
|
|
"github.com/minio/minio/pkg/disk"
|
|
)
|
|
|
|
func (fs Filesystem) isValidUploadID(object, uploadID string) bool {
|
|
s, ok := fs.multiparts.ActiveSession[object]
|
|
if !ok {
|
|
return false
|
|
}
|
|
if uploadID == s.UploadID {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
|
|
func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) {
|
|
fs.lock.Lock()
|
|
defer fs.lock.Unlock()
|
|
if !IsValidBucketName(bucket) {
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
_, err := os.Stat(bucketPath)
|
|
// check bucket exists
|
|
if os.IsNotExist(err) {
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
if err != nil {
|
|
return BucketMultipartResourcesMetadata{}, probe.NewError(InternalError{})
|
|
}
|
|
var uploads []*UploadMetadata
|
|
for object, session := range fs.multiparts.ActiveSession {
|
|
if strings.HasPrefix(object, resources.Prefix) {
|
|
if len(uploads) > resources.MaxUploads {
|
|
sort.Sort(byUploadMetadataKey(uploads))
|
|
resources.Upload = uploads
|
|
resources.NextKeyMarker = object
|
|
resources.NextUploadIDMarker = session.UploadID
|
|
resources.IsTruncated = true
|
|
return resources, nil
|
|
}
|
|
// uploadIDMarker is ignored if KeyMarker is empty
|
|
switch {
|
|
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
|
|
if object > resources.KeyMarker {
|
|
upload := new(UploadMetadata)
|
|
upload.Object = object
|
|
upload.UploadID = session.UploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
|
|
if session.UploadID > resources.UploadIDMarker {
|
|
if object >= resources.KeyMarker {
|
|
upload := new(UploadMetadata)
|
|
upload.Object = object
|
|
upload.UploadID = session.UploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
}
|
|
default:
|
|
upload := new(UploadMetadata)
|
|
upload.Object = object
|
|
upload.UploadID = session.UploadID
|
|
upload.Initiated = session.Initiated
|
|
uploads = append(uploads, upload)
|
|
}
|
|
}
|
|
}
|
|
sort.Sort(byUploadMetadataKey(uploads))
|
|
resources.Upload = uploads
|
|
return resources, nil
|
|
}
|
|
|
|
func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error {
|
|
for _, part := range parts.Part {
|
|
partFile, e := os.OpenFile(objectPath+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
|
|
defer partFile.Close()
|
|
if e != nil {
|
|
return probe.NewError(e)
|
|
}
|
|
|
|
recvMD5 := part.ETag
|
|
// complete multipart request header md5sum per part is hex encoded
|
|
// trim it and decode if possible.
|
|
_, e = hex.DecodeString(strings.Trim(recvMD5, "\""))
|
|
if e != nil {
|
|
return probe.NewError(InvalidDigest{Md5: recvMD5})
|
|
}
|
|
|
|
_, e = io.Copy(mw, partFile)
|
|
if e != nil {
|
|
return probe.NewError(e)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NewMultipartUpload - initiate a new multipart session
|
|
func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
|
|
fs.lock.Lock()
|
|
defer fs.lock.Unlock()
|
|
|
|
stfs, err := disk.Stat(fs.path)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
|
|
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
|
|
availableDiskSpace := (float64(stfs.Free) / (float64(stfs.Total) - (0.05 * float64(stfs.Total)))) * 100
|
|
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
|
return "", probe.NewError(RootPathFull{Path: fs.path})
|
|
}
|
|
|
|
if !IsValidBucketName(bucket) {
|
|
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
if !IsValidObjectName(object) {
|
|
return "", probe.NewError(ObjectNameInvalid{Object: object})
|
|
}
|
|
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
_, err = os.Stat(bucketPath)
|
|
// check bucket exists
|
|
if os.IsNotExist(err) {
|
|
return "", probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
if err != nil {
|
|
return "", probe.NewError(InternalError{})
|
|
}
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
objectDir := filepath.Dir(objectPath)
|
|
if _, err = os.Stat(objectDir); os.IsNotExist(err) {
|
|
err = os.MkdirAll(objectDir, 0700)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
}
|
|
|
|
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + object + time.Now().String())
|
|
uploadIDSum := sha512.Sum512(id)
|
|
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
|
|
|
|
multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_WRONLY|os.O_CREATE, 0600)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
defer multiPartfile.Close()
|
|
|
|
mpartSession := new(MultipartSession)
|
|
mpartSession.TotalParts = 0
|
|
mpartSession.UploadID = uploadID
|
|
mpartSession.Initiated = time.Now().UTC()
|
|
var parts []*PartMetadata
|
|
mpartSession.Parts = parts
|
|
fs.multiparts.ActiveSession[object] = mpartSession
|
|
|
|
encoder := json.NewEncoder(multiPartfile)
|
|
err = encoder.Encode(mpartSession)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
if err := saveMultipartsSession(fs.multiparts); err != nil {
|
|
return "", err.Trace()
|
|
}
|
|
return uploadID, nil
|
|
}
|
|
|
|
// partNumber is a sortable interface for Part slice
|
|
type partNumber []*PartMetadata
|
|
|
|
func (a partNumber) Len() int { return len(a) }
|
|
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
|
|
|
|
// CreateObjectPart - create a part in a multipart session
|
|
func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) {
|
|
fs.lock.Lock()
|
|
defer fs.lock.Unlock()
|
|
|
|
stfs, err := disk.Stat(fs.path)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
|
|
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
|
|
availableDiskSpace := (float64(stfs.Free) / (float64(stfs.Total) - (0.05 * float64(stfs.Total)))) * 100
|
|
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
|
return "", probe.NewError(RootPathFull{Path: fs.path})
|
|
}
|
|
|
|
if partID <= 0 {
|
|
return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero"))
|
|
}
|
|
// check bucket name valid
|
|
if !IsValidBucketName(bucket) {
|
|
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// verify object path legal
|
|
if !IsValidObjectName(object) {
|
|
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
if strings.TrimSpace(expectedMD5Sum) != "" {
|
|
var expectedMD5SumBytes []byte
|
|
expectedMD5SumBytes, err = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
|
|
if err != nil {
|
|
// pro-actively close the connection
|
|
return "", probe.NewError(InvalidDigest{Md5: expectedMD5Sum})
|
|
}
|
|
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
|
}
|
|
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, err = os.Stat(bucketPath); err != nil {
|
|
// check bucket exists
|
|
if os.IsNotExist(err) {
|
|
return "", probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
if err != nil {
|
|
return "", probe.NewError(InternalError{})
|
|
}
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
partPath := objectPath + fmt.Sprintf("$%d-$multiparts", partID)
|
|
partFile, err := atomic.FileCreateWithPrefix(partPath, "$multiparts")
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
h := md5.New()
|
|
sh := sha256.New()
|
|
mw := io.MultiWriter(partFile, h, sh)
|
|
_, err = io.CopyN(mw, data, size)
|
|
if err != nil {
|
|
partFile.CloseAndPurge()
|
|
return "", probe.NewError(err)
|
|
}
|
|
md5sum := hex.EncodeToString(h.Sum(nil))
|
|
// Verify if the written object is equal to what is expected, only if it is requested as such
|
|
if strings.TrimSpace(expectedMD5Sum) != "" {
|
|
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5sum); err != nil {
|
|
partFile.CloseAndPurge()
|
|
return "", probe.NewError(BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Object: object})
|
|
}
|
|
}
|
|
if signature != nil {
|
|
ok, perr := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil)))
|
|
if perr != nil {
|
|
partFile.CloseAndPurge()
|
|
return "", perr.Trace()
|
|
}
|
|
if !ok {
|
|
partFile.CloseAndPurge()
|
|
return "", probe.NewError(SignatureDoesNotMatch{})
|
|
}
|
|
}
|
|
partFile.Close()
|
|
|
|
fi, err := os.Stat(partPath)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
partMetadata := PartMetadata{}
|
|
partMetadata.ETag = md5sum
|
|
partMetadata.PartNumber = partID
|
|
partMetadata.Size = fi.Size()
|
|
partMetadata.LastModified = fi.ModTime()
|
|
|
|
multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDWR|os.O_APPEND, 0600)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
defer multiPartfile.Close()
|
|
|
|
var deserializedMultipartSession MultipartSession
|
|
decoder := json.NewDecoder(multiPartfile)
|
|
err = decoder.Decode(&deserializedMultipartSession)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata)
|
|
deserializedMultipartSession.TotalParts++
|
|
fs.multiparts.ActiveSession[object] = &deserializedMultipartSession
|
|
|
|
sort.Sort(partNumber(deserializedMultipartSession.Parts))
|
|
encoder := json.NewEncoder(multiPartfile)
|
|
err = encoder.Encode(&deserializedMultipartSession)
|
|
if err != nil {
|
|
return "", probe.NewError(err)
|
|
}
|
|
return partMetadata.ETag, nil
|
|
}
|
|
|
|
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
|
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
|
|
fs.lock.Lock()
|
|
defer fs.lock.Unlock()
|
|
|
|
// check bucket name valid
|
|
if !IsValidBucketName(bucket) {
|
|
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// verify object path legal
|
|
if !IsValidObjectName(object) {
|
|
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return ObjectMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
if _, err := os.Stat(bucketPath); err != nil {
|
|
// check bucket exists
|
|
if os.IsNotExist(err) {
|
|
return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
return ObjectMetadata{}, probe.NewError(InternalError{})
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
file, err := atomic.FileCreateWithPrefix(objectPath, "")
|
|
if err != nil {
|
|
return ObjectMetadata{}, probe.NewError(err)
|
|
}
|
|
h := md5.New()
|
|
mw := io.MultiWriter(file, h)
|
|
|
|
partBytes, err := ioutil.ReadAll(data)
|
|
if err != nil {
|
|
file.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(err)
|
|
}
|
|
if signature != nil {
|
|
sh := sha256.New()
|
|
sh.Write(partBytes)
|
|
ok, perr := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil)))
|
|
if perr != nil {
|
|
file.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(err)
|
|
}
|
|
if !ok {
|
|
file.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(SignatureDoesNotMatch{})
|
|
}
|
|
}
|
|
parts := &CompleteMultipartUpload{}
|
|
if err := xml.Unmarshal(partBytes, parts); err != nil {
|
|
file.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(MalformedXML{})
|
|
}
|
|
if !sort.IsSorted(completedParts(parts.Part)) {
|
|
file.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(InvalidPartOrder{})
|
|
}
|
|
|
|
if err := fs.concatParts(parts, objectPath, mw); err != nil {
|
|
file.CloseAndPurge()
|
|
return ObjectMetadata{}, err.Trace()
|
|
}
|
|
|
|
delete(fs.multiparts.ActiveSession, object)
|
|
for _, part := range parts.Part {
|
|
err = os.Remove(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
|
|
if err != nil {
|
|
file.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(err)
|
|
}
|
|
}
|
|
if err := os.Remove(objectPath + "$multiparts"); err != nil {
|
|
file.CloseAndPurge()
|
|
return ObjectMetadata{}, probe.NewError(err)
|
|
}
|
|
if err := saveMultipartsSession(fs.multiparts); err != nil {
|
|
file.CloseAndPurge()
|
|
return ObjectMetadata{}, err.Trace()
|
|
}
|
|
file.Close()
|
|
|
|
st, err := os.Stat(objectPath)
|
|
if err != nil {
|
|
return ObjectMetadata{}, probe.NewError(err)
|
|
}
|
|
newObject := ObjectMetadata{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
Created: st.ModTime(),
|
|
Size: st.Size(),
|
|
ContentType: "application/octet-stream",
|
|
Md5: hex.EncodeToString(h.Sum(nil)),
|
|
}
|
|
return newObject, nil
|
|
}
|
|
|
|
// ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata
|
|
func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) {
|
|
fs.lock.Lock()
|
|
defer fs.lock.Unlock()
|
|
|
|
// check bucket name valid
|
|
if !IsValidBucketName(bucket) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// verify object path legal
|
|
if !IsValidObjectName(object) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
if !fs.isValidUploadID(object, resources.UploadID) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
|
|
}
|
|
|
|
objectResourcesMetadata := resources
|
|
objectResourcesMetadata.Bucket = bucket
|
|
objectResourcesMetadata.Object = object
|
|
var startPartNumber int
|
|
switch {
|
|
case objectResourcesMetadata.PartNumberMarker == 0:
|
|
startPartNumber = 1
|
|
default:
|
|
startPartNumber = objectResourcesMetadata.PartNumberMarker
|
|
}
|
|
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
_, err := os.Stat(bucketPath)
|
|
// check bucket exists
|
|
if os.IsNotExist(err) {
|
|
return ObjectResourcesMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
if err != nil {
|
|
return ObjectResourcesMetadata{}, probe.NewError(InternalError{})
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDONLY, 0600)
|
|
if err != nil {
|
|
return ObjectResourcesMetadata{}, probe.NewError(err)
|
|
}
|
|
defer multiPartfile.Close()
|
|
|
|
var deserializedMultipartSession MultipartSession
|
|
decoder := json.NewDecoder(multiPartfile)
|
|
err = decoder.Decode(&deserializedMultipartSession)
|
|
if err != nil {
|
|
return ObjectResourcesMetadata{}, probe.NewError(err)
|
|
}
|
|
var parts []*PartMetadata
|
|
for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ {
|
|
if len(parts) > objectResourcesMetadata.MaxParts {
|
|
sort.Sort(partNumber(parts))
|
|
objectResourcesMetadata.IsTruncated = true
|
|
objectResourcesMetadata.Part = parts
|
|
objectResourcesMetadata.NextPartNumberMarker = i
|
|
return objectResourcesMetadata, nil
|
|
}
|
|
parts = append(parts, deserializedMultipartSession.Parts[i-1])
|
|
}
|
|
sort.Sort(partNumber(parts))
|
|
objectResourcesMetadata.Part = parts
|
|
return objectResourcesMetadata, nil
|
|
}
|
|
|
|
// AbortMultipartUpload - abort an incomplete multipart session
|
|
func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
|
|
fs.lock.Lock()
|
|
defer fs.lock.Unlock()
|
|
|
|
// check bucket name valid
|
|
if !IsValidBucketName(bucket) {
|
|
return probe.NewError(BucketNameInvalid{Bucket: bucket})
|
|
}
|
|
|
|
// verify object path legal
|
|
if !IsValidObjectName(object) {
|
|
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
|
}
|
|
|
|
if !fs.isValidUploadID(object, uploadID) {
|
|
return probe.NewError(InvalidUploadID{UploadID: uploadID})
|
|
}
|
|
|
|
bucketPath := filepath.Join(fs.path, bucket)
|
|
_, err := os.Stat(bucketPath)
|
|
// check bucket exists
|
|
if os.IsNotExist(err) {
|
|
return probe.NewError(BucketNotFound{Bucket: bucket})
|
|
}
|
|
if err != nil {
|
|
return probe.NewError(InternalError{})
|
|
}
|
|
|
|
objectPath := filepath.Join(bucketPath, object)
|
|
for _, part := range fs.multiparts.ActiveSession[object].Parts {
|
|
err = os.RemoveAll(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
|
|
if err != nil {
|
|
return probe.NewError(err)
|
|
}
|
|
}
|
|
delete(fs.multiparts.ActiveSession, object)
|
|
err = os.RemoveAll(objectPath + "$multiparts")
|
|
if err != nil {
|
|
return probe.NewError(err)
|
|
}
|
|
return nil
|
|
}
|
|
|