You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
780 lines
28 KiB
780 lines
28 KiB
/*
|
|
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package s3
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"path"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/minio/minio-go/v6/pkg/encrypt"
|
|
minio "github.com/minio/minio/cmd"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
)
|
|
|
|
const (
|
|
// name of custom multipart metadata file for s3 backend.
|
|
gwdareMetaJSON string = "dare.meta"
|
|
|
|
// name of temporary per part metadata file
|
|
gwpartMetaJSON string = "part.meta"
|
|
// custom multipart files are stored under the defaultMinioGWPrefix
|
|
defaultMinioGWPrefix = ".minio"
|
|
defaultGWContentFileName = "data"
|
|
)
|
|
|
|
// s3EncObjects is a wrapper around s3Objects and implements gateway calls for
|
|
// custom large objects encrypted at the gateway
|
|
type s3EncObjects struct {
|
|
s3Objects
|
|
}
|
|
|
|
/*
|
|
NOTE:
|
|
Custom gateway encrypted objects are stored on backend as follows:
|
|
obj/.minio/data <= encrypted content
|
|
obj/.minio/dare.meta <= metadata
|
|
|
|
When a multipart upload operation is in progress, the metadata set during
|
|
NewMultipartUpload is stored in obj/.minio/uploadID/dare.meta and each
|
|
UploadPart operation saves additional state of the part's encrypted ETag and
|
|
encrypted size in obj/.minio/uploadID/part1/part.meta
|
|
|
|
All the part metadata and temp dare.meta are cleaned up when upload completes
|
|
*/
|
|
|
|
// ListObjects lists all blobs in S3 bucket filtered by prefix
|
|
func (l *s3EncObjects) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, e error) {
|
|
var continuationToken, startAfter string
|
|
res, err := l.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, maxKeys, false, startAfter)
|
|
if err != nil {
|
|
return loi, err
|
|
}
|
|
loi.IsTruncated = res.IsTruncated
|
|
loi.NextMarker = res.NextContinuationToken
|
|
loi.Objects = res.Objects
|
|
loi.Prefixes = res.Prefixes
|
|
return loi, nil
|
|
|
|
}
|
|
|
|
// ListObjectsV2 lists all blobs in S3 bucket filtered by prefix
|
|
func (l *s3EncObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, e error) {
|
|
|
|
var objects []minio.ObjectInfo
|
|
var prefixes []string
|
|
var isTruncated bool
|
|
|
|
// filter out objects that contain a .minio prefix, but is not a dare.meta metadata file.
|
|
for {
|
|
loi, e = l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, fetchOwner, startAfter)
|
|
if e != nil {
|
|
return loi, minio.ErrorRespToObjectError(e, bucket)
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
startAfter = obj.Name
|
|
continuationToken = loi.NextContinuationToken
|
|
isTruncated = loi.IsTruncated
|
|
|
|
if !isGWObject(obj.Name) {
|
|
continue
|
|
}
|
|
// get objectname and ObjectInfo from the custom metadata file
|
|
if strings.HasSuffix(obj.Name, gwdareMetaJSON) {
|
|
objSlice := strings.Split(obj.Name, minio.SlashSeparator+defaultMinioGWPrefix)
|
|
gwMeta, e := l.getGWMetadata(ctx, bucket, getDareMetaPath(objSlice[0]))
|
|
if e != nil {
|
|
continue
|
|
}
|
|
oInfo := gwMeta.ToObjectInfo(bucket, objSlice[0])
|
|
objects = append(objects, oInfo)
|
|
} else {
|
|
objects = append(objects, obj)
|
|
}
|
|
if len(objects) > maxKeys {
|
|
break
|
|
}
|
|
}
|
|
for _, p := range loi.Prefixes {
|
|
objName := strings.TrimSuffix(p, minio.SlashSeparator)
|
|
gm, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(objName))
|
|
// if prefix is actually a custom multi-part object, append it to objects
|
|
if err == nil {
|
|
objects = append(objects, gm.ToObjectInfo(bucket, objName))
|
|
continue
|
|
}
|
|
isPrefix := l.isPrefix(ctx, bucket, p, fetchOwner, startAfter)
|
|
if isPrefix {
|
|
prefixes = append(prefixes, p)
|
|
}
|
|
}
|
|
if (len(objects) > maxKeys) || !loi.IsTruncated {
|
|
break
|
|
}
|
|
}
|
|
|
|
loi.IsTruncated = isTruncated
|
|
loi.ContinuationToken = continuationToken
|
|
loi.Objects = make([]minio.ObjectInfo, 0)
|
|
loi.Prefixes = make([]string, 0)
|
|
loi.Objects = append(loi.Objects, objects...)
|
|
|
|
for _, pfx := range prefixes {
|
|
if pfx != prefix {
|
|
loi.Prefixes = append(loi.Prefixes, pfx)
|
|
}
|
|
}
|
|
// Set continuation token if s3 returned truncated list
|
|
if isTruncated {
|
|
if len(objects) > 0 {
|
|
loi.NextContinuationToken = objects[len(objects)-1].Name
|
|
}
|
|
}
|
|
return loi, nil
|
|
}
|
|
|
|
// isGWObject returns true if it is a custom object
|
|
func isGWObject(objName string) bool {
|
|
isEncrypted := strings.Contains(objName, defaultMinioGWPrefix)
|
|
if !isEncrypted {
|
|
return true
|
|
}
|
|
// ignore temp part.meta files
|
|
if strings.Contains(objName, gwpartMetaJSON) {
|
|
return false
|
|
}
|
|
|
|
pfxSlice := strings.Split(objName, minio.SlashSeparator)
|
|
var i1, i2 int
|
|
for i := len(pfxSlice) - 1; i >= 0; i-- {
|
|
p := pfxSlice[i]
|
|
if p == defaultMinioGWPrefix {
|
|
i1 = i
|
|
}
|
|
if p == gwdareMetaJSON {
|
|
i2 = i
|
|
}
|
|
if i1 > 0 && i2 > 0 {
|
|
break
|
|
}
|
|
}
|
|
// incomplete uploads would have a uploadID between defaultMinioGWPrefix and gwdareMetaJSON
|
|
return i2 > 0 && i1 > 0 && i2-i1 == 1
|
|
}
|
|
|
|
// isPrefix returns true if prefix exists and is not an incomplete multipart upload entry
|
|
func (l *s3EncObjects) isPrefix(ctx context.Context, bucket, prefix string, fetchOwner bool, startAfter string) bool {
|
|
var continuationToken, delimiter string
|
|
|
|
for {
|
|
loi, e := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, fetchOwner, startAfter)
|
|
if e != nil {
|
|
return false
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
if isGWObject(obj.Name) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
continuationToken = loi.NextContinuationToken
|
|
if !loi.IsTruncated {
|
|
break
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// GetObject reads an object from S3. Supports additional
|
|
// parameters like offset and length which are synonymous with
|
|
// HTTP Range requests.
|
|
func (l *s3EncObjects) GetObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
|
|
return l.getObject(ctx, bucket, key, startOffset, length, writer, etag, opts)
|
|
}
|
|
|
|
func (l *s3EncObjects) isGWEncrypted(ctx context.Context, bucket, object string) bool {
|
|
_, err := l.s3Objects.GetObjectInfo(ctx, bucket, getDareMetaPath(object), minio.ObjectOptions{})
|
|
return err == nil
|
|
}
|
|
|
|
// getDaremetadata fetches dare.meta from s3 backend and marshals into a structured format.
|
|
func (l *s3EncObjects) getGWMetadata(ctx context.Context, bucket, metaFileName string) (m gwMetaV1, err error) {
|
|
oi, err1 := l.s3Objects.GetObjectInfo(ctx, bucket, metaFileName, minio.ObjectOptions{})
|
|
if err1 != nil {
|
|
return m, err1
|
|
}
|
|
var buffer bytes.Buffer
|
|
err = l.s3Objects.GetObject(ctx, bucket, metaFileName, 0, oi.Size, &buffer, oi.ETag, minio.ObjectOptions{})
|
|
if err != nil {
|
|
return m, err
|
|
}
|
|
return readGWMetadata(ctx, buffer)
|
|
}
|
|
|
|
// writes dare metadata to the s3 backend
|
|
func (l *s3EncObjects) writeGWMetadata(ctx context.Context, bucket, metaFileName string, m gwMetaV1, o minio.ObjectOptions) error {
|
|
reader, err := getGWMetadata(ctx, bucket, metaFileName, m)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
_, err = l.s3Objects.PutObject(ctx, bucket, metaFileName, reader, o)
|
|
return err
|
|
}
|
|
|
|
// returns path of temporary metadata json file for the upload
|
|
func getTmpDareMetaPath(object, uploadID string) string {
|
|
return path.Join(getGWMetaPath(object), uploadID, gwdareMetaJSON)
|
|
}
|
|
|
|
// returns path of metadata json file for encrypted objects
|
|
func getDareMetaPath(object string) string {
|
|
return path.Join(getGWMetaPath(object), gwdareMetaJSON)
|
|
}
|
|
|
|
// returns path of temporary part metadata file for multipart uploads
|
|
func getPartMetaPath(object, uploadID string, partID int) string {
|
|
return path.Join(object, defaultMinioGWPrefix, uploadID, strconv.Itoa(partID), gwpartMetaJSON)
|
|
}
|
|
|
|
// deletes the custom dare metadata file saved at the backend
|
|
func (l *s3EncObjects) deleteGWMetadata(ctx context.Context, bucket, metaFileName string) error {
|
|
return l.s3Objects.DeleteObject(ctx, bucket, metaFileName)
|
|
}
|
|
|
|
func (l *s3EncObjects) getObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
|
|
var o minio.ObjectOptions
|
|
if minio.GlobalGatewaySSE.SSEC() {
|
|
o = opts
|
|
}
|
|
dmeta, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(key))
|
|
if err != nil {
|
|
// unencrypted content
|
|
return l.s3Objects.GetObject(ctx, bucket, key, startOffset, length, writer, etag, o)
|
|
}
|
|
if startOffset < 0 {
|
|
logger.LogIf(ctx, minio.InvalidRange{})
|
|
}
|
|
|
|
// For negative length read everything.
|
|
if length < 0 {
|
|
length = dmeta.Stat.Size - startOffset
|
|
}
|
|
// Reply back invalid range if the input offset and length fall out of range.
|
|
if startOffset > dmeta.Stat.Size || startOffset+length > dmeta.Stat.Size {
|
|
logger.LogIf(ctx, minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size})
|
|
return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
|
|
}
|
|
// Get start part index and offset.
|
|
_, partOffset, err := dmeta.ObjectToPartOffset(ctx, startOffset)
|
|
if err != nil {
|
|
return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
|
|
}
|
|
|
|
// Calculate endOffset according to length
|
|
endOffset := startOffset
|
|
if length > 0 {
|
|
endOffset += length - 1
|
|
}
|
|
|
|
// Get last part index to read given length.
|
|
if _, _, err := dmeta.ObjectToPartOffset(ctx, endOffset); err != nil {
|
|
return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
|
|
}
|
|
return l.s3Objects.GetObject(ctx, bucket, key, partOffset, endOffset, writer, dmeta.ETag, o)
|
|
}
|
|
|
|
// GetObjectNInfo - returns object info and locked object ReadCloser
|
|
func (l *s3EncObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, o minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
|
|
var opts minio.ObjectOptions
|
|
if minio.GlobalGatewaySSE.SSEC() {
|
|
opts = o
|
|
}
|
|
objInfo, err := l.GetObjectInfo(ctx, bucket, object, opts)
|
|
if err != nil {
|
|
return l.s3Objects.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
|
}
|
|
objInfo.UserDefined = minio.CleanMinioInternalMetadataKeys(objInfo.UserDefined)
|
|
fn, off, length, err := minio.NewGetObjectReader(rs, objInfo, o)
|
|
if err != nil {
|
|
return nil, minio.ErrorRespToObjectError(err)
|
|
}
|
|
if l.isGWEncrypted(ctx, bucket, object) {
|
|
object = getGWContentPath(object)
|
|
}
|
|
pr, pw := io.Pipe()
|
|
go func() {
|
|
err := l.getObject(ctx, bucket, object, off, length, pw, objInfo.ETag, opts)
|
|
pw.CloseWithError(err)
|
|
}()
|
|
|
|
// Setup cleanup function to cause the above go-routine to
|
|
// exit in case of partial read
|
|
pipeCloser := func() { pr.Close() }
|
|
return fn(pr, h, o.CheckCopyPrecondFn, pipeCloser)
|
|
}
|
|
|
|
// GetObjectInfo reads object info and replies back ObjectInfo
|
|
// For custom gateway encrypted large objects, the ObjectInfo is retrieved from the dare.meta file.
|
|
func (l *s3EncObjects) GetObjectInfo(ctx context.Context, bucket string, object string, o minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
|
var opts minio.ObjectOptions
|
|
if minio.GlobalGatewaySSE.SSEC() {
|
|
opts = o
|
|
}
|
|
|
|
gwMeta, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(object))
|
|
if err != nil {
|
|
return l.s3Objects.GetObjectInfo(ctx, bucket, object, opts)
|
|
}
|
|
return gwMeta.ToObjectInfo(bucket, object), nil
|
|
}
|
|
|
|
// CopyObject copies an object from source bucket to a destination bucket.
|
|
func (l *s3EncObjects) CopyObject(ctx context.Context, srcBucket string, srcObject string, dstBucket string, dstObject string, srcInfo minio.ObjectInfo, s, d minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
|
cpSrcDstSame := strings.EqualFold(path.Join(srcBucket, srcObject), path.Join(dstBucket, dstObject))
|
|
if cpSrcDstSame {
|
|
var gwMeta gwMetaV1
|
|
if s.ServerSideEncryption != nil && d.ServerSideEncryption != nil &&
|
|
((s.ServerSideEncryption.Type() == encrypt.SSEC && d.ServerSideEncryption.Type() == encrypt.SSEC) ||
|
|
(s.ServerSideEncryption.Type() == encrypt.S3 && d.ServerSideEncryption.Type() == encrypt.S3)) {
|
|
gwMeta, err = l.getGWMetadata(ctx, srcBucket, getDareMetaPath(srcObject))
|
|
if err != nil {
|
|
return
|
|
}
|
|
header := make(http.Header)
|
|
if d.ServerSideEncryption != nil {
|
|
d.ServerSideEncryption.Marshal(header)
|
|
}
|
|
for k, v := range header {
|
|
srcInfo.UserDefined[k] = v[0]
|
|
}
|
|
gwMeta.Meta = srcInfo.UserDefined
|
|
if err = l.writeGWMetadata(ctx, dstBucket, getDareMetaPath(dstObject), gwMeta, minio.ObjectOptions{}); err != nil {
|
|
return objInfo, minio.ErrorRespToObjectError(err)
|
|
}
|
|
return gwMeta.ToObjectInfo(dstBucket, dstObject), nil
|
|
}
|
|
}
|
|
dstOpts := minio.ObjectOptions{ServerSideEncryption: d.ServerSideEncryption, UserDefined: srcInfo.UserDefined}
|
|
return l.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, dstOpts)
|
|
}
|
|
|
|
// DeleteObject deletes a blob in bucket
|
|
// For custom gateway encrypted large objects, cleans up encrypted content and metadata files
|
|
// from the backend.
|
|
func (l *s3EncObjects) DeleteObject(ctx context.Context, bucket string, object string) error {
|
|
|
|
// Get dare meta json
|
|
if _, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(object)); err != nil {
|
|
return l.s3Objects.DeleteObject(ctx, bucket, object)
|
|
}
|
|
// delete encrypted object
|
|
l.s3Objects.DeleteObject(ctx, bucket, getGWContentPath(object))
|
|
return l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
|
|
}
|
|
|
|
// ListMultipartUploads lists all multipart uploads.
|
|
func (l *s3EncObjects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, e error) {
|
|
|
|
lmi, e = l.s3Objects.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
|
if e != nil {
|
|
return
|
|
}
|
|
lmi.KeyMarker = strings.TrimSuffix(lmi.KeyMarker, getGWContentPath(minio.SlashSeparator))
|
|
lmi.NextKeyMarker = strings.TrimSuffix(lmi.NextKeyMarker, getGWContentPath(minio.SlashSeparator))
|
|
for i := range lmi.Uploads {
|
|
lmi.Uploads[i].Object = strings.TrimSuffix(lmi.Uploads[i].Object, getGWContentPath(minio.SlashSeparator))
|
|
}
|
|
return
|
|
}
|
|
|
|
// NewMultipartUpload uploads object in multiple parts
|
|
func (l *s3EncObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, o minio.ObjectOptions) (uploadID string, err error) {
|
|
var sseOpts encrypt.ServerSide
|
|
if o.ServerSideEncryption == nil {
|
|
return l.s3Objects.NewMultipartUpload(ctx, bucket, object, minio.ObjectOptions{UserDefined: o.UserDefined})
|
|
}
|
|
// Decide if sse options needed to be passed to backend
|
|
if (minio.GlobalGatewaySSE.SSEC() && o.ServerSideEncryption.Type() == encrypt.SSEC) ||
|
|
(minio.GlobalGatewaySSE.SSES3() && o.ServerSideEncryption.Type() == encrypt.S3) {
|
|
sseOpts = o.ServerSideEncryption
|
|
}
|
|
|
|
uploadID, err = l.s3Objects.NewMultipartUpload(ctx, bucket, getGWContentPath(object), minio.ObjectOptions{ServerSideEncryption: sseOpts})
|
|
if err != nil {
|
|
return
|
|
}
|
|
// Create uploadID and write a temporary dare.meta object under object/uploadID prefix
|
|
gwmeta := newGWMetaV1()
|
|
gwmeta.Meta = o.UserDefined
|
|
gwmeta.Stat.ModTime = time.Now().UTC()
|
|
err = l.writeGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID), gwmeta, minio.ObjectOptions{})
|
|
if err != nil {
|
|
return uploadID, minio.ErrorRespToObjectError(err)
|
|
}
|
|
return uploadID, nil
|
|
}
|
|
|
|
// PutObject creates a new object with the incoming data,
|
|
func (l *s3EncObjects) PutObject(ctx context.Context, bucket string, object string, data *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
|
var sseOpts encrypt.ServerSide
|
|
// Decide if sse options needed to be passed to backend
|
|
if opts.ServerSideEncryption != nil &&
|
|
((minio.GlobalGatewaySSE.SSEC() && opts.ServerSideEncryption.Type() == encrypt.SSEC) ||
|
|
(minio.GlobalGatewaySSE.SSES3() && opts.ServerSideEncryption.Type() == encrypt.S3) ||
|
|
opts.ServerSideEncryption.Type() == encrypt.KMS) {
|
|
sseOpts = opts.ServerSideEncryption
|
|
}
|
|
if opts.ServerSideEncryption == nil {
|
|
defer l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
|
|
defer l.DeleteObject(ctx, bucket, getGWContentPath(object))
|
|
return l.s3Objects.PutObject(ctx, bucket, object, data, minio.ObjectOptions{UserDefined: opts.UserDefined})
|
|
}
|
|
|
|
oi, err := l.s3Objects.PutObject(ctx, bucket, getGWContentPath(object), data, minio.ObjectOptions{ServerSideEncryption: sseOpts})
|
|
if err != nil {
|
|
return objInfo, minio.ErrorRespToObjectError(err)
|
|
}
|
|
|
|
gwMeta := newGWMetaV1()
|
|
gwMeta.Meta = make(map[string]string)
|
|
for k, v := range opts.UserDefined {
|
|
gwMeta.Meta[k] = v
|
|
}
|
|
encMD5 := data.MD5CurrentHexString()
|
|
|
|
gwMeta.ETag = encMD5
|
|
gwMeta.Stat.Size = oi.Size
|
|
gwMeta.Stat.ModTime = time.Now().UTC()
|
|
if err = l.writeGWMetadata(ctx, bucket, getDareMetaPath(object), gwMeta, minio.ObjectOptions{}); err != nil {
|
|
return objInfo, minio.ErrorRespToObjectError(err)
|
|
}
|
|
objInfo = gwMeta.ToObjectInfo(bucket, object)
|
|
// delete any unencrypted content of the same name created previously
|
|
l.s3Objects.DeleteObject(ctx, bucket, object)
|
|
return objInfo, nil
|
|
}
|
|
|
|
// PutObjectPart puts a part of object in bucket
|
|
func (l *s3EncObjects) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, data *minio.PutObjReader, opts minio.ObjectOptions) (pi minio.PartInfo, e error) {
|
|
|
|
if opts.ServerSideEncryption == nil {
|
|
return l.s3Objects.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
|
}
|
|
|
|
var s3Opts minio.ObjectOptions
|
|
// for sse-s3 encryption options should not be passed to backend
|
|
if opts.ServerSideEncryption != nil && opts.ServerSideEncryption.Type() == encrypt.SSEC && minio.GlobalGatewaySSE.SSEC() {
|
|
s3Opts = opts
|
|
}
|
|
|
|
uploadPath := getTmpGWMetaPath(object, uploadID)
|
|
tmpDareMeta := path.Join(uploadPath, gwdareMetaJSON)
|
|
_, err := l.s3Objects.GetObjectInfo(ctx, bucket, tmpDareMeta, minio.ObjectOptions{})
|
|
if err != nil {
|
|
return pi, minio.InvalidUploadID{UploadID: uploadID}
|
|
}
|
|
|
|
pi, e = l.s3Objects.PutObjectPart(ctx, bucket, getGWContentPath(object), uploadID, partID, data, s3Opts)
|
|
if e != nil {
|
|
return
|
|
}
|
|
gwMeta := newGWMetaV1()
|
|
gwMeta.Parts = make([]minio.ObjectPartInfo, 1)
|
|
// Add incoming part.
|
|
gwMeta.Parts[0] = minio.ObjectPartInfo{
|
|
Number: partID,
|
|
ETag: pi.ETag,
|
|
Size: pi.Size,
|
|
}
|
|
gwMeta.ETag = data.MD5CurrentHexString() // encrypted ETag
|
|
gwMeta.Stat.Size = pi.Size
|
|
gwMeta.Stat.ModTime = pi.LastModified
|
|
|
|
if err = l.writeGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, partID), gwMeta, minio.ObjectOptions{}); err != nil {
|
|
return pi, minio.ErrorRespToObjectError(err)
|
|
}
|
|
return minio.PartInfo{
|
|
Size: gwMeta.Stat.Size,
|
|
ETag: minio.CanonicalizeETag(gwMeta.ETag),
|
|
LastModified: gwMeta.Stat.ModTime,
|
|
PartNumber: partID,
|
|
}, nil
|
|
}
|
|
|
|
// CopyObjectPart creates a part in a multipart upload by copying
|
|
// existing object or a part of it.
|
|
func (l *s3EncObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string,
|
|
partID int, startOffset, length int64, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (p minio.PartInfo, err error) {
|
|
return l.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, srcInfo.PutObjReader, dstOpts)
|
|
}
|
|
|
|
// ListObjectParts returns all object parts for specified object in specified bucket
|
|
func (l *s3EncObjects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, e error) {
|
|
// We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise,
|
|
// there could be situations of dare.meta getting corrupted by competing upload parts.
|
|
dm, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
|
|
if err != nil {
|
|
return l.s3Objects.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
|
}
|
|
|
|
lpi, err = l.s3Objects.ListObjectParts(ctx, bucket, getGWContentPath(object), uploadID, partNumberMarker, maxParts, opts)
|
|
if err != nil {
|
|
return lpi, err
|
|
}
|
|
for i, part := range lpi.Parts {
|
|
partMeta, err := l.getGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, part.PartNumber))
|
|
if err != nil || len(partMeta.Parts) == 0 {
|
|
return lpi, minio.InvalidPart{}
|
|
}
|
|
lpi.Parts[i].ETag = partMeta.ETag
|
|
}
|
|
lpi.UserDefined = dm.Meta
|
|
lpi.Object = object
|
|
return lpi, nil
|
|
}
|
|
|
|
// AbortMultipartUpload aborts a ongoing multipart upload
|
|
func (l *s3EncObjects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error {
|
|
if _, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID)); err != nil {
|
|
return l.s3Objects.AbortMultipartUpload(ctx, bucket, object, uploadID)
|
|
}
|
|
|
|
if err := l.s3Objects.AbortMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID); err != nil {
|
|
return err
|
|
}
|
|
|
|
uploadPrefix := getTmpGWMetaPath(object, uploadID)
|
|
var continuationToken, startAfter, delimiter string
|
|
for {
|
|
loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, uploadPrefix, continuationToken, delimiter, 1000, false, startAfter)
|
|
if err != nil {
|
|
return minio.InvalidUploadID{UploadID: uploadID}
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
if err := l.s3Objects.DeleteObject(ctx, bucket, obj.Name); err != nil {
|
|
return minio.ErrorRespToObjectError(err)
|
|
}
|
|
startAfter = obj.Name
|
|
}
|
|
continuationToken = loi.NextContinuationToken
|
|
if !loi.IsTruncated {
|
|
break
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
|
|
func (l *s3EncObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, e error) {
|
|
|
|
tmpMeta, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
|
|
if err != nil {
|
|
oi, e = l.s3Objects.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
|
|
if e == nil {
|
|
// delete any encrypted version of object that might exist
|
|
defer l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
|
|
defer l.DeleteObject(ctx, bucket, getGWContentPath(object))
|
|
}
|
|
return oi, e
|
|
}
|
|
gwMeta := newGWMetaV1()
|
|
gwMeta.Meta = make(map[string]string)
|
|
for k, v := range tmpMeta.Meta {
|
|
gwMeta.Meta[k] = v
|
|
}
|
|
// Allocate parts similar to incoming slice.
|
|
gwMeta.Parts = make([]minio.ObjectPartInfo, len(uploadedParts))
|
|
|
|
bkUploadedParts := make([]minio.CompletePart, len(uploadedParts))
|
|
// Calculate full object size.
|
|
var objectSize int64
|
|
|
|
// Validate each part and then commit to disk.
|
|
for i, part := range uploadedParts {
|
|
partMeta, err := l.getGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, part.PartNumber))
|
|
if err != nil || len(partMeta.Parts) == 0 {
|
|
return oi, minio.InvalidPart{}
|
|
}
|
|
bkUploadedParts[i] = minio.CompletePart{PartNumber: part.PartNumber, ETag: partMeta.Parts[0].ETag}
|
|
gwMeta.Parts[i] = partMeta.Parts[0]
|
|
objectSize += partMeta.Parts[0].Size
|
|
}
|
|
oi, e = l.s3Objects.CompleteMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID, bkUploadedParts, opts)
|
|
if e != nil {
|
|
return oi, e
|
|
}
|
|
|
|
//delete any unencrypted version of object that might be on the backend
|
|
defer l.s3Objects.DeleteObject(ctx, bucket, object)
|
|
|
|
// Save the final object size and modtime.
|
|
gwMeta.Stat.Size = objectSize
|
|
gwMeta.Stat.ModTime = time.Now().UTC()
|
|
gwMeta.ETag = oi.ETag
|
|
|
|
if err = l.writeGWMetadata(ctx, bucket, getDareMetaPath(object), gwMeta, minio.ObjectOptions{}); err != nil {
|
|
return oi, minio.ErrorRespToObjectError(err)
|
|
}
|
|
// Clean up any uploaded parts that are not being committed by this CompleteMultipart operation
|
|
var continuationToken, startAfter, delimiter string
|
|
uploadPrefix := getTmpGWMetaPath(object, uploadID)
|
|
done := false
|
|
for {
|
|
loi, lerr := l.s3Objects.ListObjectsV2(ctx, bucket, uploadPrefix, continuationToken, delimiter, 1000, false, startAfter)
|
|
if lerr != nil {
|
|
done = true
|
|
break
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
if !strings.HasPrefix(obj.Name, uploadPrefix) {
|
|
done = true
|
|
break
|
|
}
|
|
startAfter = obj.Name
|
|
l.s3Objects.DeleteObject(ctx, bucket, obj.Name)
|
|
}
|
|
continuationToken = loi.NextContinuationToken
|
|
if !loi.IsTruncated || done {
|
|
break
|
|
}
|
|
}
|
|
|
|
return gwMeta.ToObjectInfo(bucket, object), nil
|
|
}
|
|
|
|
// getTmpGWMetaPath returns the prefix under which uploads in progress are stored on backend
|
|
func getTmpGWMetaPath(object, uploadID string) string {
|
|
return path.Join(object, defaultMinioGWPrefix, uploadID)
|
|
}
|
|
|
|
// getGWMetaPath returns the prefix under which custom object metadata and object are stored on backend after upload completes
|
|
func getGWMetaPath(object string) string {
|
|
return path.Join(object, defaultMinioGWPrefix)
|
|
}
|
|
|
|
// getGWContentPath returns the prefix under which custom object is stored on backend after upload completes
|
|
func getGWContentPath(object string) string {
|
|
return path.Join(object, defaultMinioGWPrefix, defaultGWContentFileName)
|
|
}
|
|
|
|
// Clean-up the stale incomplete encrypted multipart uploads. Should be run in a Go routine.
|
|
func (l *s3EncObjects) cleanupStaleEncMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
|
|
ticker := time.NewTicker(cleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
l.cleanupStaleEncMultipartUploadsOnGW(ctx, expiry)
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupStaleMultipartUploads removes old custom encryption multipart uploads on backend
|
|
func (l *s3EncObjects) cleanupStaleEncMultipartUploadsOnGW(ctx context.Context, expiry time.Duration) {
|
|
for {
|
|
buckets, err := l.s3Objects.ListBuckets(ctx)
|
|
if err != nil {
|
|
break
|
|
}
|
|
for _, b := range buckets {
|
|
expParts := l.getStalePartsForBucket(ctx, b.Name, expiry)
|
|
for k := range expParts {
|
|
l.s3Objects.DeleteObject(ctx, b.Name, k)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *s3EncObjects) getStalePartsForBucket(ctx context.Context, bucket string, expiry time.Duration) (expParts map[string]string) {
|
|
var prefix, continuationToken, delimiter, startAfter string
|
|
expParts = make(map[string]string)
|
|
now := time.Now()
|
|
for {
|
|
loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, false, startAfter)
|
|
if err != nil {
|
|
break
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
startAfter = obj.Name
|
|
if !strings.Contains(obj.Name, defaultMinioGWPrefix) {
|
|
continue
|
|
}
|
|
|
|
if isGWObject(obj.Name) {
|
|
continue
|
|
}
|
|
|
|
// delete temporary part.meta or dare.meta files for incomplete uploads that are past expiry
|
|
if (strings.HasSuffix(obj.Name, gwpartMetaJSON) || strings.HasSuffix(obj.Name, gwdareMetaJSON)) &&
|
|
now.Sub(obj.ModTime) > expiry {
|
|
expParts[obj.Name] = ""
|
|
}
|
|
}
|
|
continuationToken = loi.NextContinuationToken
|
|
if !loi.IsTruncated {
|
|
break
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (l *s3EncObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
|
|
var prefix, continuationToken, delimiter, startAfter string
|
|
expParts := make(map[string]string)
|
|
|
|
for {
|
|
loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, false, startAfter)
|
|
if err != nil {
|
|
break
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
startAfter = obj.Name
|
|
if !strings.Contains(obj.Name, defaultMinioGWPrefix) {
|
|
return minio.BucketNotEmpty{}
|
|
}
|
|
if isGWObject(obj.Name) {
|
|
return minio.BucketNotEmpty{}
|
|
}
|
|
// delete temporary part.meta or dare.meta files for incomplete uploads
|
|
if strings.HasSuffix(obj.Name, gwpartMetaJSON) || strings.HasSuffix(obj.Name, gwdareMetaJSON) {
|
|
expParts[obj.Name] = ""
|
|
}
|
|
}
|
|
continuationToken = loi.NextContinuationToken
|
|
if !loi.IsTruncated {
|
|
break
|
|
}
|
|
}
|
|
for k := range expParts {
|
|
l.s3Objects.DeleteObject(ctx, bucket, k)
|
|
}
|
|
err := l.Client.RemoveBucket(bucket)
|
|
if err != nil {
|
|
return minio.ErrorRespToObjectError(err, bucket)
|
|
}
|
|
return nil
|
|
}
|
|
|