Support encryption for CopyObject, GET-Range requests (#5544)

- Implement CopyObject encryption support
- Handle Range GETs for encrypted objects

Fixes #5193
master
Harshavardhana 7 years ago committed by kannappanr
parent b7536570f8
commit 7cc678c653
  1. 216
      cmd/encryption-v1.go
  2. 129
      cmd/encryption-v1_test.go
  3. 57
      cmd/fs-v1.go
  4. 7
      cmd/object-api-datatypes.go
  5. 157
      cmd/object-handlers.go
  6. 57
      cmd/xl-sets.go
  7. 26
      cmd/xl-v1-object.go
  8. 52
      pkg/ioutil/ioutil.go

@ -1,5 +1,5 @@
/* /*
* Minio Cloud Storage, (C) 2017 Minio, Inc. * Minio Cloud Storage, (C) 2017, 2018 Minio, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"crypto/md5" "crypto/md5"
"crypto/rand" "crypto/rand"
"crypto/subtle"
"encoding/base64" "encoding/base64"
"errors" "errors"
"io" "io"
@ -51,6 +52,13 @@ const (
SSECustomerKey = "X-Amz-Server-Side-Encryption-Customer-Key" SSECustomerKey = "X-Amz-Server-Side-Encryption-Customer-Key"
// SSECustomerKeyMD5 is the AWS SSE-C encryption key MD5 HTTP header key. // SSECustomerKeyMD5 is the AWS SSE-C encryption key MD5 HTTP header key.
SSECustomerKeyMD5 = "X-Amz-Server-Side-Encryption-Customer-Key-MD5" SSECustomerKeyMD5 = "X-Amz-Server-Side-Encryption-Customer-Key-MD5"
// SSECopyCustomerAlgorithm is the AWS SSE-C algorithm HTTP header key for CopyObject API.
SSECopyCustomerAlgorithm = "X-Amz-Copy-Source-Server-Side-Encryption-Customer-Algorithm"
// SSECopyCustomerKey is the AWS SSE-C encryption key HTTP header key for CopyObject API.
SSECopyCustomerKey = "X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key"
// SSECopyCustomerKeyMD5 is the AWS SSE-C encryption key MD5 HTTP header key for CopyObject API.
SSECopyCustomerKeyMD5 = "X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key-MD5"
) )
const ( const (
@ -129,6 +137,54 @@ func IsSSECustomerRequest(header http.Header) bool {
return header.Get(SSECustomerAlgorithm) != "" || header.Get(SSECustomerKey) != "" || header.Get(SSECustomerKeyMD5) != "" return header.Get(SSECustomerAlgorithm) != "" || header.Get(SSECustomerKey) != "" || header.Get(SSECustomerKeyMD5) != ""
} }
// IsSSECopyCustomerRequest returns true if the given HTTP header
// contains copy source server-side-encryption with customer provided key fields.
func IsSSECopyCustomerRequest(header http.Header) bool {
return header.Get(SSECopyCustomerAlgorithm) != "" || header.Get(SSECopyCustomerKey) != "" || header.Get(SSECopyCustomerKeyMD5) != ""
}
// ParseSSECopyCustomerRequest parses the SSE-C header fields of the provided request.
// It returns the client provided key on success.
func ParseSSECopyCustomerRequest(r *http.Request) (key []byte, err error) {
if !globalIsSSL { // minio only supports HTTP or HTTPS requests not both at the same time
// we cannot use r.TLS == nil here because Go's http implementation reflects on
// the net.Conn and sets the TLS field of http.Request only if it's an tls.Conn.
// Minio uses a BufConn (wrapping a tls.Conn) so the type check within the http package
// will always fail -> r.TLS is always nil even for TLS requests.
return nil, errInsecureSSERequest
}
header := r.Header
if algorithm := header.Get(SSECopyCustomerAlgorithm); algorithm != SSECustomerAlgorithmAES256 {
return nil, errInvalidSSEAlgorithm
}
if header.Get(SSECopyCustomerKey) == "" {
return nil, errMissingSSEKey
}
if header.Get(SSECopyCustomerKeyMD5) == "" {
return nil, errMissingSSEKeyMD5
}
key, err = base64.StdEncoding.DecodeString(header.Get(SSECopyCustomerKey))
if err != nil {
return nil, errInvalidSSEKey
}
if len(key) != SSECustomerKeySize {
return nil, errInvalidSSEKey
}
// Make sure we purged the keys from http headers by now.
header.Del(SSECopyCustomerKey)
keyMD5, err := base64.StdEncoding.DecodeString(header.Get(SSECopyCustomerKeyMD5))
if err != nil {
return nil, errSSEKeyMD5Mismatch
}
if md5Sum := md5.Sum(key); !bytes.Equal(md5Sum[:], keyMD5) {
return nil, errSSEKeyMD5Mismatch
}
return key, nil
}
// ParseSSECustomerRequest parses the SSE-C header fields of the provided request. // ParseSSECustomerRequest parses the SSE-C header fields of the provided request.
// It returns the client provided key on success. // It returns the client provided key on success.
func ParseSSECustomerRequest(r *http.Request) (key []byte, err error) { func ParseSSECustomerRequest(r *http.Request) (key []byte, err error) {
@ -154,11 +210,12 @@ func ParseSSECustomerRequest(r *http.Request) (key []byte, err error) {
if err != nil { if err != nil {
return nil, errInvalidSSEKey return nil, errInvalidSSEKey
} }
header.Del(SSECustomerKey) // make sure we do not save the key by accident
if len(key) != SSECustomerKeySize { if len(key) != SSECustomerKeySize {
return nil, errInvalidSSEKey return nil, errInvalidSSEKey
} }
// Make sure we purged the keys from http headers by now.
header.Del(SSECustomerKey)
keyMD5, err := base64.StdEncoding.DecodeString(header.Get(SSECustomerKeyMD5)) keyMD5, err := base64.StdEncoding.DecodeString(header.Get(SSECustomerKeyMD5))
if err != nil { if err != nil {
@ -170,14 +227,66 @@ func ParseSSECustomerRequest(r *http.Request) (key []byte, err error) {
return key, nil return key, nil
} }
// EncryptRequest takes the client provided content and encrypts the data // This function rotates old to new key.
// with the client provided key. It also marks the object as client-side-encrypted func rotateKey(oldKey []byte, newKey []byte, metadata map[string]string) error {
// and sets the correct headers. if subtle.ConstantTimeCompare(oldKey, newKey) == 1 {
func EncryptRequest(content io.Reader, r *http.Request, metadata map[string]string) (io.Reader, error) { return nil
key, err := ParseSSECustomerRequest(r) }
if err != nil { delete(metadata, SSECustomerKey) // make sure we do not save the key by accident
return nil, err
if metadata[ServerSideEncryptionSealAlgorithm] != SSESealAlgorithmDareSha256 { // currently DARE-SHA256 is the only option
return errObjectTampered
}
iv, err := base64.StdEncoding.DecodeString(metadata[ServerSideEncryptionIV])
if err != nil || len(iv) != 32 {
return errObjectTampered
}
sealedKey, err := base64.StdEncoding.DecodeString(metadata[ServerSideEncryptionSealedKey])
if err != nil || len(sealedKey) != 64 {
return errObjectTampered
}
sha := sha256.New() // derive key encryption key
sha.Write(oldKey)
sha.Write(iv)
keyEncryptionKey := sha.Sum(nil)
objectEncryptionKey := bytes.NewBuffer(nil) // decrypt object encryption key
n, err := sio.Decrypt(objectEncryptionKey, bytes.NewReader(sealedKey), sio.Config{
Key: keyEncryptionKey,
})
if n != 32 || err != nil {
// Either the provided key does not match or the object was tampered.
// To provide strict AWS S3 compatibility we return: access denied.
return errSSEKeyMismatch
}
nonce := make([]byte, 32) // generate random values for key derivation
if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
return err
}
niv := sha256.Sum256(nonce[:]) // derive key encryption key
sha = sha256.New()
sha.Write(newKey)
sha.Write(niv[:])
keyEncryptionKey = sha.Sum(nil)
sealedKeyW := bytes.NewBuffer(nil) // sealedKey := 16 byte header + 32 byte payload + 16 byte tag
n, err = sio.Encrypt(sealedKeyW, bytes.NewReader(objectEncryptionKey.Bytes()), sio.Config{
Key: keyEncryptionKey,
})
if n != 64 || err != nil {
return errors.New("failed to seal object encryption key") // if this happens there's a bug in the code (may panic ?)
} }
metadata[ServerSideEncryptionIV] = base64.StdEncoding.EncodeToString(niv[:])
metadata[ServerSideEncryptionSealAlgorithm] = SSESealAlgorithmDareSha256
metadata[ServerSideEncryptionSealedKey] = base64.StdEncoding.EncodeToString(sealedKeyW.Bytes())
return nil
}
func newEncryptReader(content io.Reader, key []byte, metadata map[string]string) (io.Reader, error) {
delete(metadata, SSECustomerKey) // make sure we do not save the key by accident delete(metadata, SSECustomerKey) // make sure we do not save the key by accident
// security notice: // security notice:
@ -189,7 +298,7 @@ func EncryptRequest(content io.Reader, r *http.Request, metadata map[string]stri
// be repeated in addition to reveal the object encryption key. // be repeated in addition to reveal the object encryption key.
// [ P(coll) ~= 1 / 2^((256 + 64) / 2) ] // [ P(coll) ~= 1 / 2^((256 + 64) / 2) ]
nonce := make([]byte, 64) // generate random values for key derivation nonce := make([]byte, 64) // generate random values for key derivation
if _, err = io.ReadFull(rand.Reader, nonce); err != nil { if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, err return nil, err
} }
sha := sha256.New() // derive object encryption key sha := sha256.New() // derive object encryption key
@ -222,15 +331,29 @@ func EncryptRequest(content io.Reader, r *http.Request, metadata map[string]stri
return reader, nil return reader, nil
} }
// DecryptRequest decrypts the object with the client provided key. It also removes // EncryptRequest takes the client provided content and encrypts the data
// the client-side-encryption metadata from the object and sets the correct headers. // with the client provided key. It also marks the object as client-side-encrypted
func DecryptRequest(client io.Writer, r *http.Request, metadata map[string]string) (io.WriteCloser, error) { // and sets the correct headers.
func EncryptRequest(content io.Reader, r *http.Request, metadata map[string]string) (io.Reader, error) {
key, err := ParseSSECustomerRequest(r) key, err := ParseSSECustomerRequest(r)
if err != nil { if err != nil {
return nil, err return nil, err
} }
delete(metadata, SSECustomerKey) // make sure we do not save the key by accident return newEncryptReader(content, key, metadata)
}
// DecryptCopyRequest decrypts the object with the client provided key. It also removes
// the client-side-encryption metadata from the object and sets the correct headers.
func DecryptCopyRequest(client io.Writer, r *http.Request, metadata map[string]string) (io.WriteCloser, error) {
key, err := ParseSSECopyCustomerRequest(r)
if err != nil {
return nil, err
}
delete(metadata, SSECopyCustomerKey) // make sure we do not save the key by accident
return newDecryptWriter(client, key, 0, metadata)
}
func newDecryptWriter(client io.Writer, key []byte, seqNumber uint32, metadata map[string]string) (io.WriteCloser, error) {
if metadata[ServerSideEncryptionSealAlgorithm] != SSESealAlgorithmDareSha256 { // currently DARE-SHA256 is the only option if metadata[ServerSideEncryptionSealAlgorithm] != SSESealAlgorithmDareSha256 { // currently DARE-SHA256 is the only option
return nil, errObjectTampered return nil, errObjectTampered
} }
@ -258,7 +381,10 @@ func DecryptRequest(client io.Writer, r *http.Request, metadata map[string]strin
return nil, errSSEKeyMismatch return nil, errSSEKeyMismatch
} }
writer, err := sio.DecryptWriter(client, sio.Config{Key: objectEncryptionKey.Bytes()}) writer, err := sio.DecryptWriter(client, sio.Config{
Key: objectEncryptionKey.Bytes(),
SequenceNumber: seqNumber,
})
if err != nil { if err != nil {
return nil, errInvalidSSEKey return nil, errInvalidSSEKey
} }
@ -269,6 +395,35 @@ func DecryptRequest(client io.Writer, r *http.Request, metadata map[string]strin
return writer, nil return writer, nil
} }
// DecryptRequestWithSequenceNumber decrypts the object with the client provided key. It also removes
// the client-side-encryption metadata from the object and sets the correct headers.
func DecryptRequestWithSequenceNumber(client io.Writer, r *http.Request, seqNumber uint32, metadata map[string]string) (io.WriteCloser, error) {
key, err := ParseSSECustomerRequest(r)
if err != nil {
return nil, err
}
delete(metadata, SSECustomerKey) // make sure we do not save the key by accident
return newDecryptWriter(client, key, seqNumber, metadata)
}
// DecryptRequest decrypts the object with the client provided key. It also removes
// the client-side-encryption metadata from the object and sets the correct headers.
func DecryptRequest(client io.Writer, r *http.Request, metadata map[string]string) (io.WriteCloser, error) {
return DecryptRequestWithSequenceNumber(client, r, 0, metadata)
}
// getStartOffset - get sequence number, start offset and rlength.
func getStartOffset(offset, length int64) (seqNumber uint32, startOffset int64, rlength int64) {
seqNumber = uint32(offset / (64 * 1024))
startOffset = int64(seqNumber) * (64*1024 + 32)
rlength = (length / (64 * 1024)) * (64*1024 + 32)
if length%(64*1024) > 0 {
rlength += 64*1024 + 32
}
return seqNumber, startOffset, rlength
}
// IsEncrypted returns true if the object is marked as encrypted. // IsEncrypted returns true if the object is marked as encrypted.
func (o *ObjectInfo) IsEncrypted() bool { func (o *ObjectInfo) IsEncrypted() bool {
if _, ok := o.UserDefined[ServerSideEncryptionIV]; ok { if _, ok := o.UserDefined[ServerSideEncryptionIV]; ok {
@ -315,6 +470,33 @@ func (o *ObjectInfo) EncryptedSize() int64 {
return size return size
} }
// DecryptCopyObjectInfo tries to decrypt the provided object if it is encrypted.
// It fails if the object is encrypted and the HTTP headers don't contain
// SSE-C headers or the object is not encrypted but SSE-C headers are provided. (AWS behavior)
// DecryptObjectInfo returns 'ErrNone' if the object is not encrypted or the
// decryption succeeded.
//
// DecryptCopyObjectInfo also returns whether the object is encrypted or not.
func DecryptCopyObjectInfo(info *ObjectInfo, headers http.Header) (apiErr APIErrorCode, encrypted bool) {
// Directories are never encrypted.
if info.IsDir {
return ErrNone, false
}
if apiErr, encrypted = ErrNone, info.IsEncrypted(); !encrypted && IsSSECopyCustomerRequest(headers) {
apiErr = ErrInvalidEncryptionParameters
} else if encrypted {
if !IsSSECopyCustomerRequest(headers) {
apiErr = ErrSSEEncryptedObject
return
}
var err error
if info.Size, err = info.DecryptedSize(); err != nil {
apiErr = toAPIErrorCode(err)
}
}
return
}
// DecryptObjectInfo tries to decrypt the provided object if it is encrypted. // DecryptObjectInfo tries to decrypt the provided object if it is encrypted.
// It fails if the object is encrypted and the HTTP headers don't contain // It fails if the object is encrypted and the HTTP headers don't contain
// SSE-C headers or the object is not encrypted but SSE-C headers are provided. (AWS behavior) // SSE-C headers or the object is not encrypted but SSE-C headers are provided. (AWS behavior)
@ -323,6 +505,10 @@ func (o *ObjectInfo) EncryptedSize() int64 {
// //
// DecryptObjectInfo also returns whether the object is encrypted or not. // DecryptObjectInfo also returns whether the object is encrypted or not.
func DecryptObjectInfo(info *ObjectInfo, headers http.Header) (apiErr APIErrorCode, encrypted bool) { func DecryptObjectInfo(info *ObjectInfo, headers http.Header) (apiErr APIErrorCode, encrypted bool) {
// Directories are never encrypted.
if info.IsDir {
return ErrNone, false
}
if apiErr, encrypted = ErrNone, info.IsEncrypted(); !encrypted && IsSSECustomerRequest(headers) { if apiErr, encrypted = ErrNone, info.IsEncrypted(); !encrypted && IsSSECustomerRequest(headers) {
apiErr = ErrInvalidEncryptionParameters apiErr = ErrInvalidEncryptionParameters
} else if encrypted { } else if encrypted {

@ -1,5 +1,5 @@
/* /*
* Minio Cloud Storage, (C) 2017 Minio, Inc. * Minio Cloud Storage, (C) 2017, 2018 Minio, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -22,6 +22,31 @@ import (
"testing" "testing"
) )
var isSSECopyCustomerRequestTests = []struct {
headers map[string]string
sseRequest bool
}{
{headers: map[string]string{SSECopyCustomerAlgorithm: "AES256", SSECopyCustomerKey: "key", SSECopyCustomerKeyMD5: "md5"}, sseRequest: true}, // 0
{headers: map[string]string{SSECopyCustomerAlgorithm: "AES256"}, sseRequest: true}, // 1
{headers: map[string]string{SSECopyCustomerKey: "key"}, sseRequest: true}, // 2
{headers: map[string]string{SSECopyCustomerKeyMD5: "md5"}, sseRequest: true}, // 3
{headers: map[string]string{}, sseRequest: false}, // 4
{headers: map[string]string{SSECopyCustomerAlgorithm + " ": "AES256", " " + SSECopyCustomerKey: "key", SSECopyCustomerKeyMD5 + " ": "md5"}, sseRequest: false}, // 5
{headers: map[string]string{SSECopyCustomerAlgorithm: "", SSECopyCustomerKey: "", SSECopyCustomerKeyMD5: ""}, sseRequest: false}, // 6
}
func TestIsSSECopyCustomerRequest(t *testing.T) {
for i, test := range isSSECopyCustomerRequestTests {
headers := http.Header{}
for k, v := range test.headers {
headers.Set(k, v)
}
if IsSSECopyCustomerRequest(headers) != test.sseRequest {
t.Errorf("Test %d: Expected IsSSECopyCustomerRequest to return %v", i, test.sseRequest)
}
}
}
var isSSECustomerRequestTests = []struct { var isSSECustomerRequestTests = []struct {
headers map[string]string headers map[string]string
sseRequest bool sseRequest bool
@ -145,6 +170,108 @@ func TestParseSSECustomerRequest(t *testing.T) {
if (err == nil || err == errSSEKeyMD5Mismatch) && key != "" { if (err == nil || err == errSSEKeyMD5Mismatch) && key != "" {
t.Errorf("Test %d: Client key survived parsing - found key: %v", i, key) t.Errorf("Test %d: Client key survived parsing - found key: %v", i, key)
} }
}
}
var parseSSECopyCustomerRequestTests = []struct {
headers map[string]string
useTLS bool
err error
}{
{
headers: map[string]string{
SSECopyCustomerAlgorithm: "AES256",
SSECopyCustomerKey: "XAm0dRrJsEsyPb1UuFNezv1bl9hxuYsgUVC/MUctE2k=", // 0
SSECopyCustomerKeyMD5: "bY4wkxQejw9mUJfo72k53A==",
},
useTLS: true, err: nil,
},
{
headers: map[string]string{
SSECopyCustomerAlgorithm: "AES256",
SSECopyCustomerKey: "XAm0dRrJsEsyPb1UuFNezv1bl9hxuYsgUVC/MUctE2k=", // 1
SSECopyCustomerKeyMD5: "bY4wkxQejw9mUJfo72k53A==",
},
useTLS: false, err: errInsecureSSERequest,
},
{
headers: map[string]string{
SSECopyCustomerAlgorithm: "AES 256",
SSECopyCustomerKey: "XAm0dRrJsEsyPb1UuFNezv1bl9hxuYsgUVC/MUctE2k=", // 2
SSECopyCustomerKeyMD5: "bY4wkxQejw9mUJfo72k53A==",
},
useTLS: true, err: errInvalidSSEAlgorithm,
},
{
headers: map[string]string{
SSECopyCustomerAlgorithm: "AES256",
SSECopyCustomerKey: "NjE0SL87s+ZhYtaTrg5eI5cjhCQLGPVMKenPG2bCJFw=", // 3
SSECopyCustomerKeyMD5: "H+jq/LwEOEO90YtiTuNFVw==",
},
useTLS: true, err: errSSEKeyMD5Mismatch,
},
{
headers: map[string]string{
SSECopyCustomerAlgorithm: "AES256",
SSECopyCustomerKey: " jE0SL87s+ZhYtaTrg5eI5cjhCQLGPVMKenPG2bCJFw=", // 4
SSECopyCustomerKeyMD5: "H+jq/LwEOEO90YtiTuNFVw==",
},
useTLS: true, err: errInvalidSSEKey,
},
{
headers: map[string]string{
SSECopyCustomerAlgorithm: "AES256",
SSECopyCustomerKey: "NjE0SL87s+ZhYtaTrg5eI5cjhCQLGPVMKenPG2bCJFw=", // 5
SSECopyCustomerKeyMD5: " +jq/LwEOEO90YtiTuNFVw==",
},
useTLS: true, err: errSSEKeyMD5Mismatch,
},
{
headers: map[string]string{
SSECopyCustomerAlgorithm: "AES256",
SSECopyCustomerKey: "vFQ9ScFOF6Tu/BfzMS+rVMvlZGJHi5HmGJenJfrfKI45", // 6
SSECopyCustomerKeyMD5: "9KPgDdZNTHimuYCwnJTp5g==",
},
useTLS: true, err: errInvalidSSEKey,
},
{
headers: map[string]string{
SSECopyCustomerAlgorithm: "AES256",
SSECopyCustomerKey: "", // 7
SSECopyCustomerKeyMD5: "9KPgDdZNTHimuYCwnJTp5g==",
},
useTLS: true, err: errMissingSSEKey,
},
{
headers: map[string]string{
SSECopyCustomerAlgorithm: "AES256",
SSECopyCustomerKey: "vFQ9ScFOF6Tu/BfzMS+rVMvlZGJHi5HmGJenJfrfKI45", // 8
SSECopyCustomerKeyMD5: "",
},
useTLS: true, err: errMissingSSEKeyMD5,
},
}
func TestParseSSECopyCustomerRequest(t *testing.T) {
defer func(flag bool) { globalIsSSL = flag }(globalIsSSL)
for i, test := range parseSSECopyCustomerRequestTests {
headers := http.Header{}
for k, v := range test.headers {
headers.Set(k, v)
}
request := &http.Request{}
request.Header = headers
globalIsSSL = test.useTLS
_, err := ParseSSECopyCustomerRequest(request)
if err != test.err {
t.Errorf("Test %d: Parse returned: %v want: %v", i, err, test.err)
}
key := request.Header.Get(SSECopyCustomerKey)
if (err == nil || err == errSSEKeyMD5Mismatch) && key != "" {
t.Errorf("Test %d: Client key survived parsing - found key: %v", i, key)
}
} }
} }

@ -361,7 +361,7 @@ func (fs *FSObjects) DeleteBucket(bucket string) error {
// if source object and destination object are same we only // if source object and destination object are same we only
// update metadata. // update metadata.
func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo) (oi ObjectInfo, e error) { func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo) (oi ObjectInfo, e error) {
cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
// Hold write lock on destination since in both cases // Hold write lock on destination since in both cases
// - if source and destination are same // - if source and destination are same
// - if source and destination are different // - if source and destination are different
@ -387,18 +387,9 @@ func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject strin
return oi, toObjectErr(err, srcBucket) return oi, toObjectErr(err, srcBucket)
} }
// Stat the file to get file size. if cpSrcDstSame && srcInfo.metadataOnly {
fi, err := fsStatFile(pathJoin(fs.fsPath, srcBucket, srcObject))
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
// Check if this request is only metadata update.
cpMetadataOnly := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
if cpMetadataOnly {
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, srcBucket, srcObject, fsMetaJSONFile) fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, srcBucket, srcObject, fsMetaJSONFile)
var wlk *lock.LockedFile wlk, err := fs.rwPool.Write(fsMetaPath)
wlk, err = fs.rwPool.Write(fsMetaPath)
if err != nil { if err != nil {
return oi, toObjectErr(errors.Trace(err), srcBucket, srcObject) return oi, toObjectErr(errors.Trace(err), srcBucket, srcObject)
} }
@ -412,39 +403,35 @@ func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject strin
return oi, toObjectErr(err, srcBucket, srcObject) return oi, toObjectErr(err, srcBucket, srcObject)
} }
// Stat the file to get file size.
fi, err := fsStatFile(pathJoin(fs.fsPath, srcBucket, srcObject))
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
// Return the new object info. // Return the new object info.
return fsMeta.ToObjectInfo(srcBucket, srcObject, fi), nil return fsMeta.ToObjectInfo(srcBucket, srcObject, fi), nil
} }
// Length of the file to read.
length := fi.Size()
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
go func() { go func() {
var startOffset int64 // Read the whole file. if gerr := fs.getObject(srcBucket, srcObject, 0, srcInfo.Size, srcInfo.Writer, srcInfo.ETag, !cpSrcDstSame); gerr != nil {
if gerr := fs.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcInfo.ETag); gerr != nil { if gerr = srcInfo.Writer.Close(); gerr != nil {
errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject) errorIf(gerr, "Unable to read the object %s/%s.", srcBucket, srcObject)
pipeWriter.CloseWithError(gerr) }
return return
} }
pipeWriter.Close() // Close writer explicitly signalling we wrote all data. // Close writer explicitly signalling we wrote all data.
}() if gerr := srcInfo.Writer.Close(); gerr != nil {
errorIf(gerr, "Unable to read the object %s/%s.", srcBucket, srcObject)
hashReader, err := hash.NewReader(pipeReader, length, "", "") return
if err != nil {
return oi, toObjectErr(err, dstBucket, dstObject)
} }
}()
objInfo, err := fs.putObject(dstBucket, dstObject, hashReader, srcInfo.UserDefined) objInfo, err := fs.putObject(dstBucket, dstObject, srcInfo.Reader, srcInfo.UserDefined)
if err != nil { if err != nil {
return oi, toObjectErr(err, dstBucket, dstObject) return oi, toObjectErr(err, dstBucket, dstObject)
} }
// Explicitly close the reader.
pipeReader.Close()
return objInfo, nil return objInfo, nil
} }
@ -465,11 +452,11 @@ func (fs *FSObjects) GetObject(bucket, object string, offset int64, length int64
return err return err
} }
defer objectLock.RUnlock() defer objectLock.RUnlock()
return fs.getObject(bucket, object, offset, length, writer, etag) return fs.getObject(bucket, object, offset, length, writer, etag, true)
} }
// getObject - wrapper for GetObject // getObject - wrapper for GetObject
func (fs *FSObjects) getObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) { func (fs *FSObjects) getObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string, lock bool) (err error) {
if _, err = fs.statBucketDir(bucket); err != nil { if _, err = fs.statBucketDir(bucket); err != nil {
return toObjectErr(err, bucket) return toObjectErr(err, bucket)
} }
@ -492,12 +479,14 @@ func (fs *FSObjects) getObject(bucket, object string, offset int64, length int64
if bucket != minioMetaBucket { if bucket != minioMetaBucket {
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile) fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile)
if lock {
_, err = fs.rwPool.Open(fsMetaPath) _, err = fs.rwPool.Open(fsMetaPath)
if err != nil && err != errFileNotFound { if err != nil && err != errFileNotFound {
return toObjectErr(errors.Trace(err), bucket, object) return toObjectErr(errors.Trace(err), bucket, object)
} }
defer fs.rwPool.Close(fsMetaPath) defer fs.rwPool.Close(fsMetaPath)
} }
}
if etag != "" { if etag != "" {
objEtag, perr := fs.getObjectETag(bucket, object) objEtag, perr := fs.getObjectETag(bucket, object)

@ -17,8 +17,10 @@
package cmd package cmd
import ( import (
"io"
"time" "time"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/madmin"
) )
@ -98,6 +100,11 @@ type ObjectInfo struct {
// User-Defined metadata // User-Defined metadata
UserDefined map[string]string UserDefined map[string]string
// Implements writer and reader used by CopyObject API
Writer io.WriteCloser `json:"-"`
Reader *hash.Reader `json:"-"`
metadataOnly bool
} }
// ListPartsInfo - represents list of all parts. // ListPartsInfo - represents list of all parts.

@ -1,5 +1,5 @@
/* /*
* Minio Cloud Storage, (C) 2015 Minio, Inc. * Minio Cloud Storage, (C) 2015-2018 Minio, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -19,6 +19,7 @@ package cmd
import ( import (
"encoding/hex" "encoding/hex"
"encoding/xml" "encoding/xml"
"fmt"
"io" "io"
goioutil "io/ioutil" goioutil "io/ioutil"
"net" "net"
@ -158,19 +159,24 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
writer = w writer = w
if objectAPI.IsEncryptionSupported() { if objectAPI.IsEncryptionSupported() {
if IsSSECustomerRequest(r.Header) { if IsSSECustomerRequest(r.Header) {
writer, err = DecryptRequest(writer, r, objInfo.UserDefined) // Response writer should be limited early on for decryption upto required length,
// additionally also skipping mod(offset)64KiB boundaries.
writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length)
var sequenceNumber uint32
sequenceNumber, startOffset, length = getStartOffset(startOffset, length)
if length > objInfo.EncryptedSize() {
length = objInfo.EncryptedSize()
}
writer, err = DecryptRequestWithSequenceNumber(writer, r, sequenceNumber, objInfo.UserDefined)
if err != nil { if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return return
} }
w.Header().Set(SSECustomerAlgorithm, r.Header.Get(SSECustomerAlgorithm)) w.Header().Set(SSECustomerAlgorithm, r.Header.Get(SSECustomerAlgorithm))
w.Header().Set(SSECustomerKeyMD5, r.Header.Get(SSECustomerKeyMD5)) w.Header().Set(SSECustomerKeyMD5, r.Header.Get(SSECustomerKeyMD5))
if startOffset != 0 || length < objInfo.Size {
writeErrorResponse(w, ErrNotImplemented, r.URL) // SSE-C requests with HTTP range are not supported yet
return
}
length = objInfo.EncryptedSize()
} }
} }
@ -178,14 +184,16 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
setHeadGetRespHeaders(w, r.URL.Query()) setHeadGetRespHeaders(w, r.URL.Query())
httpWriter := ioutil.WriteOnClose(writer) httpWriter := ioutil.WriteOnClose(writer)
// Reads the object at startOffset and writes to mw. // Reads the object at startOffset and writes to httpWriter.
if err = objectAPI.GetObject(bucket, object, startOffset, length, httpWriter, objInfo.ETag); err != nil { if err = objectAPI.GetObject(bucket, object, startOffset, length, httpWriter, objInfo.ETag); err != nil {
errorIf(err, "Unable to write to client.") errorIf(err, "Unable to write to client.")
if !httpWriter.HasWritten() { // write error response only if no data has been written to client yet if !httpWriter.HasWritten() { // write error response only if no data has been written to client yet
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)
} }
httpWriter.Close()
return return
} }
if err = httpWriter.Close(); err != nil { if err = httpWriter.Close(); err != nil {
if !httpWriter.HasWritten() { // write error response only if no data has been written to client yet if !httpWriter.HasWritten() { // write error response only if no data has been written to client yet
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)
@ -241,7 +249,7 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
return return
} }
if objectAPI.IsEncryptionSupported() && !objInfo.IsDir { if objectAPI.IsEncryptionSupported() {
if apiErr, encrypted := DecryptObjectInfo(&objInfo, r.Header); apiErr != ErrNone { if apiErr, encrypted := DecryptObjectInfo(&objInfo, r.Header); apiErr != ErrNone {
writeErrorResponse(w, apiErr, r.URL) writeErrorResponse(w, apiErr, r.URL)
return return
@ -287,7 +295,14 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
// Extract metadata relevant for an CopyObject operation based on conditional // Extract metadata relevant for an CopyObject operation based on conditional
// header values specified in X-Amz-Metadata-Directive. // header values specified in X-Amz-Metadata-Directive.
func getCpObjMetadataFromHeader(header http.Header, defaultMeta map[string]string) (map[string]string, error) { func getCpObjMetadataFromHeader(header http.Header, userMeta map[string]string) (map[string]string, error) {
// Make a copy of the supplied metadata to avoid
// to change the original one.
defaultMeta := make(map[string]string, len(userMeta))
for k, v := range userMeta {
defaultMeta[k] = v
}
// if x-amz-metadata-directive says REPLACE then // if x-amz-metadata-directive says REPLACE then
// we extract metadata from the input headers. // we extract metadata from the input headers.
if isMetadataReplace(header) { if isMetadataReplace(header) {
@ -340,26 +355,26 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
return return
} }
if IsSSECustomerRequest(r.Header) { // handle SSE-C requests
// SSE-C is not implemented for CopyObject operations yet
writeErrorResponse(w, ErrNotImplemented, r.URL)
return
}
// Check if metadata directive is valid. // Check if metadata directive is valid.
if !isMetadataDirectiveValid(r.Header) { if !isMetadataDirectiveValid(r.Header) {
writeErrorResponse(w, ErrInvalidMetadataDirective, r.URL) writeErrorResponse(w, ErrInvalidMetadataDirective, r.URL)
return return
} }
cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
srcInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject) srcInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject)
if err != nil { if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return return
} }
if objectAPI.IsEncryptionSupported() {
if apiErr, _ := DecryptCopyObjectInfo(&srcInfo, r.Header); apiErr != ErrNone {
writeErrorResponse(w, apiErr, r.URL)
return
}
}
// Verify before x-amz-copy-source preconditions before continuing with CopyObject. // Verify before x-amz-copy-source preconditions before continuing with CopyObject.
if checkCopyObjectPreconditions(w, r, srcInfo) { if checkCopyObjectPreconditions(w, r, srcInfo) {
return return
@ -371,25 +386,119 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
return return
} }
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
// We have to copy metadata only if source and destination are same.
// this changes for encryption which can be observed below.
if cpSrcDstSame {
srcInfo.metadataOnly = true
}
var writer io.WriteCloser = pipeWriter
var reader io.Reader = pipeReader
var encMetadata = make(map[string]string)
if objectAPI.IsEncryptionSupported() {
var oldKey, newKey []byte
sseCopyC := IsSSECopyCustomerRequest(r.Header)
sseC := IsSSECustomerRequest(r.Header)
if sseCopyC {
oldKey, err = ParseSSECopyCustomerRequest(r)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
}
if sseC {
newKey, err = ParseSSECustomerRequest(r)
if err != nil {
pipeReader.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
}
// AWS S3 implementation requires us to only rotate keys
// when/ both keys are provided and destination is same
// otherwise we proceed to encrypt/decrypt.
if len(oldKey) > 0 && len(newKey) > 0 && cpSrcDstSame {
for k, v := range srcInfo.UserDefined {
encMetadata[k] = v
}
if err = rotateKey(oldKey, newKey, encMetadata); err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
} else {
if sseCopyC {
// Source is encrypted make sure to save the encrypted size.
if srcInfo.IsEncrypted() {
srcInfo.Size = srcInfo.EncryptedSize()
}
writer, err = newDecryptWriter(pipeWriter, oldKey, 0, srcInfo.UserDefined)
if err != nil {
pipeWriter.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// We are not only copying just metadata instead
// we are creating a new object at this point, even
// if source and destination are same objects.
srcInfo.metadataOnly = false
}
if sseC {
reader, err = newEncryptReader(pipeReader, newKey, encMetadata)
if err != nil {
pipeReader.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// We are not only copying just metadata instead
// we are creating a new object at this point, even
// if source and destination are same objects.
srcInfo.metadataOnly = false
}
}
}
srcInfo.Writer = writer
srcInfo.UserDefined, err = getCpObjMetadataFromHeader(r.Header, srcInfo.UserDefined) srcInfo.UserDefined, err = getCpObjMetadataFromHeader(r.Header, srcInfo.UserDefined)
if err != nil { if err != nil {
pipeReader.CloseWithError(err)
errorIf(err, "found invalid http request header") errorIf(err, "found invalid http request header")
writeErrorResponse(w, ErrInternalError, r.URL) writeErrorResponse(w, ErrInternalError, r.URL)
return return
} }
// We need to preserve the encryption headers set in EncryptRequest,
// so we do not want to override them, copy them instead.
for k, v := range encMetadata {
srcInfo.UserDefined[k] = v
}
// Make sure to remove saved etag if any, CopyObject calculates a new one. // Make sure to remove saved etag if any, CopyObject calculates a new one.
delete(srcInfo.UserDefined, "etag") delete(srcInfo.UserDefined, "etag")
// Check if x-amz-metadata-directive was not set to REPLACE and source, // Check if x-amz-metadata-directive was not set to REPLACE and source,
// desination are same objects. // desination are same objects. Apply this restriction also when
if !isMetadataReplace(r.Header) && cpSrcDstSame { // metadataOnly is true indicating that we are not overwriting the object.
if !isMetadataReplace(r.Header) && srcInfo.metadataOnly {
pipeReader.CloseWithError(fmt.Errorf("invalid copy dest"))
// If x-amz-metadata-directive is not set to REPLACE then we need // If x-amz-metadata-directive is not set to REPLACE then we need
// to error out if source and destination are same. // to error out if source and destination are same.
writeErrorResponse(w, ErrInvalidCopyDest, r.URL) writeErrorResponse(w, ErrInvalidCopyDest, r.URL)
return return
} }
hashReader, err := hash.NewReader(reader, srcInfo.Size, "", "") // do not try to verify encrypted content
if err != nil {
pipeReader.CloseWithError(err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
srcInfo.Reader = hashReader
// Copy source object to destination, if source and destination // Copy source object to destination, if source and destination
// object is same then only metadata is updated. // object is same then only metadata is updated.
objInfo, err := objectAPI.CopyObject(srcBucket, srcObject, dstBucket, dstObject, srcInfo) objInfo, err := objectAPI.CopyObject(srcBucket, srcObject, dstBucket, dstObject, srcInfo)
@ -398,6 +507,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
return return
} }
pipeReader.Close()
response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime) response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response) encodedSuccessResponse := encodeResponse(response)
@ -557,8 +668,9 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return return
} }
if objectAPI.IsEncryptionSupported() { if objectAPI.IsEncryptionSupported() {
if IsSSECustomerRequest(r.Header) && size > 0 { // handle SSE-C requests if IsSSECustomerRequest(r.Header) && !hasSuffix(object, slashSeparator) { // handle SSE-C requests
reader, err = EncryptRequest(hashReader, r, metadata) reader, err = EncryptRequest(hashReader, r, metadata)
if err != nil { if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)
@ -578,6 +690,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return return
} }
w.Header().Set("ETag", "\""+objInfo.ETag+"\"") w.Header().Set("ETag", "\""+objInfo.ETag+"\"")
if objectAPI.IsEncryptionSupported() { if objectAPI.IsEncryptionSupported() {
if IsSSECustomerRequest(r.Header) { if IsSSECustomerRequest(r.Header) {

@ -551,39 +551,48 @@ func (s *xlSets) CopyObject(srcBucket, srcObject, destBucket, destObject string,
destSet := s.getHashedSet(destObject) destSet := s.getHashedSet(destObject)
// Check if this request is only metadata update. // Check if this request is only metadata update.
cpMetadataOnly := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(destBucket, destObject)) cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(destBucket, destObject))
if cpMetadataOnly { if cpSrcDstSame && srcInfo.metadataOnly {
return srcSet.CopyObject(srcBucket, srcObject, destBucket, destObject, srcInfo) return srcSet.CopyObject(srcBucket, srcObject, destBucket, destObject, srcInfo)
} }
// Initialize pipe. // Hold write lock on destination since in both cases
pipeReader, pipeWriter := io.Pipe() // - if source and destination are same
// - if source and destination are different
// it is the sole mutating state.
objectDWLock := destSet.nsMutex.NewNSLock(destBucket, destObject)
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
return objInfo, err
}
defer objectDWLock.Unlock()
// if source and destination are different, we have to hold
// additional read lock as well to protect against writes on
// source.
if !cpSrcDstSame {
// Hold read locks on source object only if we are
// going to read data from source object.
objectSRLock := srcSet.nsMutex.NewNSLock(srcBucket, srcObject)
if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil {
return objInfo, err
}
defer objectSRLock.RUnlock()
}
go func() { go func() {
if gerr := srcSet.GetObject(srcBucket, srcObject, 0, srcInfo.Size, pipeWriter, srcInfo.ETag); gerr != nil { if gerr := srcSet.getObject(srcBucket, srcObject, 0, srcInfo.Size, srcInfo.Writer, srcInfo.ETag); gerr != nil {
errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject) if gerr = srcInfo.Writer.Close(); gerr != nil {
pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject)) errorIf(gerr, "Unable to read the object %s/%s.", srcBucket, srcObject)
return
} }
pipeWriter.Close() // Close writer explicitly signalling we wrote all data. return
}()
hashReader, err := hash.NewReader(pipeReader, srcInfo.Size, "", "")
if err != nil {
pipeReader.CloseWithError(err)
return srcInfo, toObjectErr(errors.Trace(err), destBucket, destObject)
} }
// Close writer explicitly signalling we wrote all data.
objInfo, err = destSet.PutObject(destBucket, destObject, hashReader, srcInfo.UserDefined) if gerr := srcInfo.Writer.Close(); gerr != nil {
if err != nil { errorIf(gerr, "Unable to read the object %s/%s.", srcBucket, srcObject)
pipeReader.CloseWithError(err) return
return objInfo, err
} }
}()
// Explicitly close the reader. return destSet.putObject(destBucket, destObject, srcInfo.Reader, srcInfo.UserDefined)
pipeReader.Close()
return objInfo, nil
} }
// Returns function "listDir" of the type listDirFunc. // Returns function "listDir" of the type listDirFunc.

@ -80,28 +80,7 @@ func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks [
// if source object and destination object are same we only // if source object and destination object are same we only
// update metadata. // update metadata.
func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo) (oi ObjectInfo, e error) { func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo) (oi ObjectInfo, e error) {
cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
// Hold write lock on destination since in both cases
// - if source and destination are same
// - if source and destination are different
// it is the sole mutating state.
objectDWLock := xl.nsMutex.NewNSLock(dstBucket, dstObject)
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
return oi, err
}
defer objectDWLock.Unlock()
// if source and destination are different, we have to hold
// additional read lock as well to protect against writes on
// source.
if !cpSrcDstSame {
// Hold read locks on source object only if we are
// going to read data from source object.
objectSRLock := xl.nsMutex.NewNSLock(srcBucket, srcObject)
if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil {
return oi, err
}
defer objectSRLock.RUnlock()
}
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
metaArr, errs := readAllXLMetadata(xl.getDisks(), srcBucket, srcObject) metaArr, errs := readAllXLMetadata(xl.getDisks(), srcBucket, srcObject)
@ -132,8 +111,7 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
length := xlMeta.Stat.Size length := xlMeta.Stat.Size
// Check if this request is only metadata update. // Check if this request is only metadata update.
cpMetadataOnly := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) if cpSrcDstSame {
if cpMetadataOnly {
xlMeta.Meta = srcInfo.UserDefined xlMeta.Meta = srcInfo.UserDefined
partsMetadata := make([]xlMetaV1, len(xl.getDisks())) partsMetadata := make([]xlMetaV1, len(xl.getDisks()))
// Update `xl.json` content on each disks. // Update `xl.json` content on each disks.

@ -1,5 +1,5 @@
/* /*
* Minio Cloud Storage, (C) 2017 Minio, Inc. * Minio Cloud Storage, (C) 2017, 2018 Minio, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -64,3 +64,53 @@ func (w *WriteOnCloser) HasWritten() bool { return w.hasWritten }
func WriteOnClose(w io.Writer) *WriteOnCloser { func WriteOnClose(w io.Writer) *WriteOnCloser {
return &WriteOnCloser{w, false} return &WriteOnCloser{w, false}
} }
// LimitWriter implements io.WriteCloser.
//
// This is implemented such that we want to restrict
// an enscapsulated writer upto a certain length
// and skip a certain number of bytes.
type LimitWriter struct {
io.Writer
skipBytes int64
wLimit int64
}
// Implements the io.Writer interface limiting upto
// configured length, also skips the first N bytes.
func (w *LimitWriter) Write(p []byte) (n int, err error) {
n = len(p)
var n1 int
if w.skipBytes > 0 {
if w.skipBytes >= int64(len(p)) {
w.skipBytes = w.skipBytes - int64(len(p))
return n, nil
}
p = p[w.skipBytes:]
w.skipBytes = 0
}
if w.wLimit == 0 {
return n, nil
}
if w.wLimit < int64(len(p)) {
n1, err = w.Writer.Write(p[:w.wLimit])
w.wLimit = w.wLimit - int64(n1)
return n, err
}
n1, err = w.Writer.Write(p)
w.wLimit = w.wLimit - int64(n1)
return n, err
}
// Close closes the LimitWriter. It behaves like io.Closer.
func (w *LimitWriter) Close() error {
if closer, ok := w.Writer.(io.Closer); ok {
return closer.Close()
}
return nil
}
// LimitedWriter takes an io.Writer and returns an ioutil.LimitWriter.
func LimitedWriter(w io.Writer, skipBytes int64, limit int64) *LimitWriter {
return &LimitWriter{w, skipBytes, limit}
}

Loading…
Cancel
Save