From 120b0619662827d336f8234063df47981039caf1 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Thu, 1 Mar 2018 20:37:57 +0100 Subject: [PATCH] Add multipart support in SSE-C encryption (#5576) *) Add Put/Get support of multipart in encryption *) Add GET Range support for encryption *) Add CopyPart encrypted support *) Support decrypting of large single PUT object --- cmd/api-errors.go | 6 + cmd/bucket-handlers-listobjects.go | 24 ++- cmd/encryption-v1.go | 307 ++++++++++++++++++++++++++--- cmd/encryption-v1_test.go | 32 +-- cmd/fs-v1-metadata.go | 28 +++ cmd/fs-v1-multipart.go | 68 ++++--- cmd/fs-v1.go | 45 +---- cmd/object-api-datatypes.go | 6 + cmd/object-handlers.go | 181 ++++++++++++++--- cmd/xl-sets.go | 32 +-- cmd/xl-v1-metadata.go | 13 +- cmd/xl-v1-metadata_test.go | 6 +- cmd/xl-v1-multipart.go | 42 ++-- cmd/xl-v1-object.go | 21 +- cmd/xl-v1-utils.go | 8 +- 15 files changed, 592 insertions(+), 227 deletions(-) diff --git a/cmd/api-errors.go b/cmd/api-errors.go index fb0d9908f..bf598f4f6 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -129,6 +129,7 @@ const ( // Server-Side-Encryption (with Customer provided key) related API errors. ErrInsecureSSECustomerRequest + ErrSSEMultipartEncrypted ErrSSEEncryptedObject ErrInvalidEncryptionParameters ErrInvalidSSECustomerAlgorithm @@ -631,6 +632,11 @@ var errorCodeResponse = map[APIErrorCode]APIError{ Description: errInsecureSSERequest.Error(), HTTPStatusCode: http.StatusBadRequest, }, + ErrSSEMultipartEncrypted: { + Code: "InvalidRequest", + Description: "The multipart upload initiate requested encryption. Subsequent part requests must include the appropriate encryption parameters.", + HTTPStatusCode: http.StatusBadRequest, + }, ErrSSEEncryptedObject: { Code: "InvalidRequest", Description: errEncryptedObject.Error(), diff --git a/cmd/bucket-handlers-listobjects.go b/cmd/bucket-handlers-listobjects.go index 81ca1d5ba..ba5632e4c 100644 --- a/cmd/bucket-handlers-listobjects.go +++ b/cmd/bucket-handlers-listobjects.go @@ -94,7 +94,18 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http return } - response := generateListObjectsV2Response(bucket, prefix, token, listObjectsV2Info.NextContinuationToken, startAfter, delimiter, fetchOwner, listObjectsV2Info.IsTruncated, maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes) + for i := range listObjectsV2Info.Objects { + if listObjectsV2Info.Objects[i].IsEncrypted() { + listObjectsV2Info.Objects[i].Size, err = listObjectsV2Info.Objects[i].DecryptedSize() + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + } + } + + response := generateListObjectsV2Response(bucket, prefix, token, listObjectsV2Info.NextContinuationToken, startAfter, + delimiter, fetchOwner, listObjectsV2Info.IsTruncated, maxKeys, listObjectsV2Info.Objects, listObjectsV2Info.Prefixes) // Write success response. writeSuccessResponseXML(w, encodeResponse(response)) @@ -143,6 +154,17 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } + + for i := range listObjectsInfo.Objects { + if listObjectsInfo.Objects[i].IsEncrypted() { + listObjectsInfo.Objects[i].Size, err = listObjectsInfo.Objects[i].DecryptedSize() + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + } + } + response := generateListObjectsV1Response(bucket, prefix, marker, delimiter, maxKeys, listObjectsInfo) // Write success response. diff --git a/cmd/encryption-v1.go b/cmd/encryption-v1.go index ec4014495..ab01743e2 100644 --- a/cmd/encryption-v1.go +++ b/cmd/encryption-v1.go @@ -18,10 +18,12 @@ package cmd import ( "bytes" + "crypto/hmac" "crypto/md5" "crypto/rand" "crypto/subtle" "encoding/base64" + "encoding/binary" "errors" "io" "net/http" @@ -66,8 +68,18 @@ const ( // Currently AWS supports only AES256. So the SSE-C key size is fixed to 32 bytes. SSECustomerKeySize = 32 + // SSEIVSize is the size of the IV data + SSEIVSize = 32 // 32 bytes + // SSECustomerAlgorithmAES256 the only valid S3 SSE-C encryption algorithm identifier. SSECustomerAlgorithmAES256 = "AES256" + + // SSE dare package block size. + sseDAREPackageBlockSize = 64 * 1024 // 64KiB bytes + + // SSE dare package meta padding bytes. + sseDAREPackageMetaSize = 32 // 32 bytes + ) // SSE-C key derivation, key verification and key update: @@ -238,7 +250,7 @@ func rotateKey(oldKey []byte, newKey []byte, metadata map[string]string) error { return errObjectTampered } iv, err := base64.StdEncoding.DecodeString(metadata[ServerSideEncryptionIV]) - if err != nil || len(iv) != 32 { + if err != nil || len(iv) != SSEIVSize { return errObjectTampered } sealedKey, err := base64.StdEncoding.DecodeString(metadata[ServerSideEncryptionSealedKey]) @@ -286,7 +298,7 @@ func rotateKey(oldKey []byte, newKey []byte, metadata map[string]string) error { return nil } -func newEncryptReader(content io.Reader, key []byte, metadata map[string]string) (io.Reader, error) { +func newEncryptMetadata(key []byte, metadata map[string]string) ([]byte, error) { delete(metadata, SSECustomerKey) // make sure we do not save the key by accident // security notice: @@ -297,7 +309,7 @@ func newEncryptReader(content io.Reader, key []byte, metadata map[string]string) // authenticated en/decryption scheme. The DARE format will generate an 8 byte nonce which must // be repeated in addition to reveal the object encryption key. // [ P(coll) ~= 1 / 2^((256 + 64) / 2) ] - nonce := make([]byte, 64) // generate random values for key derivation + nonce := make([]byte, 32+SSEIVSize) // generate random values for key derivation if _, err := io.ReadFull(rand.Reader, nonce); err != nil { return nil, err } @@ -320,14 +332,24 @@ func newEncryptReader(content io.Reader, key []byte, metadata map[string]string) return nil, errors.New("failed to seal object encryption key") // if this happens there's a bug in the code (may panic ?) } + metadata[ServerSideEncryptionIV] = base64.StdEncoding.EncodeToString(iv[:]) + metadata[ServerSideEncryptionSealAlgorithm] = SSESealAlgorithmDareSha256 + metadata[ServerSideEncryptionSealedKey] = base64.StdEncoding.EncodeToString(sealedKey.Bytes()) + + return objectEncryptionKey, nil +} + +func newEncryptReader(content io.Reader, key []byte, metadata map[string]string) (io.Reader, error) { + objectEncryptionKey, err := newEncryptMetadata(key, metadata) + if err != nil { + return nil, err + } + reader, err := sio.EncryptReader(content, sio.Config{Key: objectEncryptionKey}) if err != nil { return nil, errInvalidSSEKey } - metadata[ServerSideEncryptionIV] = base64.StdEncoding.EncodeToString(iv[:]) - metadata[ServerSideEncryptionSealAlgorithm] = SSESealAlgorithmDareSha256 - metadata[ServerSideEncryptionSealedKey] = base64.StdEncoding.EncodeToString(sealedKey.Bytes()) return reader, nil } @@ -353,12 +375,12 @@ func DecryptCopyRequest(client io.Writer, r *http.Request, metadata map[string]s return newDecryptWriter(client, key, 0, metadata) } -func newDecryptWriter(client io.Writer, key []byte, seqNumber uint32, metadata map[string]string) (io.WriteCloser, error) { +func decryptObjectInfo(key []byte, metadata map[string]string) ([]byte, error) { if metadata[ServerSideEncryptionSealAlgorithm] != SSESealAlgorithmDareSha256 { // currently DARE-SHA256 is the only option return nil, errObjectTampered } iv, err := base64.StdEncoding.DecodeString(metadata[ServerSideEncryptionIV]) - if err != nil || len(iv) != 32 { + if err != nil || len(iv) != SSEIVSize { return nil, errObjectTampered } sealedKey, err := base64.StdEncoding.DecodeString(metadata[ServerSideEncryptionSealedKey]) @@ -380,9 +402,21 @@ func newDecryptWriter(client io.Writer, key []byte, seqNumber uint32, metadata m // To provide strict AWS S3 compatibility we return: access denied. return nil, errSSEKeyMismatch } + return objectEncryptionKey.Bytes(), nil +} +func newDecryptWriter(client io.Writer, key []byte, seqNumber uint32, metadata map[string]string) (io.WriteCloser, error) { + objectEncryptionKey, err := decryptObjectInfo(key, metadata) + if err != nil { + return nil, err + + } + return newDecryptWriterWithObjectKey(client, objectEncryptionKey, seqNumber, metadata) +} + +func newDecryptWriterWithObjectKey(client io.Writer, objectEncryptionKey []byte, seqNumber uint32, metadata map[string]string) (io.WriteCloser, error) { writer, err := sio.DecryptWriter(client, sio.Config{ - Key: objectEncryptionKey.Bytes(), + Key: objectEncryptionKey, SequenceNumber: seqNumber, }) if err != nil { @@ -412,16 +446,208 @@ func DecryptRequest(client io.Writer, r *http.Request, metadata map[string]strin return DecryptRequestWithSequenceNumber(client, r, 0, metadata) } -// getStartOffset - get sequence number, start offset and rlength. -func getStartOffset(offset, length int64) (seqNumber uint32, startOffset int64, rlength int64) { - seqNumber = uint32(offset / (64 * 1024)) - startOffset = int64(seqNumber) * (64*1024 + 32) +// DecryptBlocksWriter - decrypts multipart parts, while implementing a io.Writer compatible interface. +type DecryptBlocksWriter struct { + // Original writer where the plain data will be written + writer io.Writer + // Current decrypter for the current encrypted data block + decrypter io.WriteCloser + // Start sequence number + startSeqNum uint32 + // Current part index + partIndex int + // Parts information + parts []objectPartInfo + req *http.Request + metadata map[string]string + + partEncRelOffset int64 + + // Customer Key + customerKeyHeader string +} + +func (w *DecryptBlocksWriter) buildDecrypter(partID int) error { + m := make(map[string]string) + for k, v := range w.metadata { + m[k] = v + } + // Initialize the first decrypter, new decrypters will be initialized in Write() operation as needed. + w.req.Header.Set(SSECustomerKey, w.customerKeyHeader) + key, err := ParseSSECustomerRequest(w.req) + if err != nil { + return err + } + + objectEncryptionKey, err := decryptObjectInfo(key, m) + if err != nil { + return err + } + + var partIDbin [4]byte + binary.LittleEndian.PutUint32(partIDbin[:], uint32(partID)) // marshal part ID + + mac := hmac.New(sha256.New, objectEncryptionKey) // derive part encryption key from part ID and object key + mac.Write(partIDbin[:]) + partEncryptionKey := mac.Sum(nil) + + delete(m, SSECustomerKey) // make sure we do not save the key by accident + + decrypter, err := newDecryptWriterWithObjectKey(w.writer, partEncryptionKey, w.startSeqNum, m) + if err != nil { + return err + } - rlength = (length / (64 * 1024)) * (64*1024 + 32) - if length%(64*1024) > 0 { - rlength += 64*1024 + 32 + if w.decrypter != nil { + err = w.decrypter.Close() + if err != nil { + return err + } } - return seqNumber, startOffset, rlength + + w.decrypter = decrypter + return nil +} + +func (w *DecryptBlocksWriter) Write(p []byte) (int, error) { + var err error + var n1 int + if int64(len(p)) < w.parts[w.partIndex].Size-w.partEncRelOffset { + n1, err = w.decrypter.Write(p) + if err != nil { + return 0, err + } + w.partEncRelOffset += int64(n1) + } else { + n1, err = w.decrypter.Write(p[:w.parts[w.partIndex].Size-w.partEncRelOffset]) + if err != nil { + return 0, err + } + + // We should now proceed to next part, reset all values appropriately. + w.partEncRelOffset = 0 + w.startSeqNum = 0 + + w.partIndex++ + + err = w.buildDecrypter(w.partIndex + 1) + if err != nil { + return 0, err + } + + n1, err = w.decrypter.Write(p[n1:]) + if err != nil { + return 0, err + } + + w.partEncRelOffset += int64(n1) + } + + return len(p), nil +} + +// Close closes the LimitWriter. It behaves like io.Closer. +func (w *DecryptBlocksWriter) Close() error { + if w.decrypter != nil { + err := w.decrypter.Close() + if err != nil { + return err + } + } + + if closer, ok := w.writer.(io.Closer); ok { + return closer.Close() + } + return nil +} + +// DecryptBlocksRequest - setup a struct which can decrypt many concatenated encrypted data +// parts information helps to know the boundaries of each encrypted data block. +func DecryptBlocksRequest(client io.Writer, r *http.Request, startOffset, length int64, objInfo ObjectInfo) (io.WriteCloser, int64, int64, error) { + seqNumber, encStartOffset, encLength := getEncryptedStartOffset(startOffset, length) + + // Encryption length cannot be bigger than the file size, if it is + // which is allowed in AWS S3, we simply default to EncryptedSize(). + if encLength+encStartOffset > objInfo.EncryptedSize() { + encLength = objInfo.EncryptedSize() - encStartOffset + } + + if len(objInfo.Parts) == 0 || !objInfo.IsEncryptedMultipart() { + writer, err := DecryptRequestWithSequenceNumber(client, r, seqNumber, objInfo.UserDefined) + if err != nil { + return nil, 0, 0, err + } + + return writer, encStartOffset, encLength, nil + } + + var partStartIndex int + var partStartOffset = startOffset + // Skip parts until final offset maps to a particular part offset. + for i, part := range objInfo.Parts { + decryptedSize, err := decryptedSize(part.Size) + if err != nil { + return nil, -1, -1, err + } + + partStartIndex = i + + // Offset is smaller than size we have reached the + // proper part offset, break out we start from + // this part index. + if partStartOffset < decryptedSize { + break + } + + // Continue to look for next part. + partStartOffset -= decryptedSize + } + + startSeqNum := partStartOffset / sseDAREPackageBlockSize + partEncRelOffset := int64(startSeqNum) * (sseDAREPackageBlockSize + sseDAREPackageMetaSize) + + w := &DecryptBlocksWriter{ + writer: client, + startSeqNum: uint32(startSeqNum), + partEncRelOffset: partEncRelOffset, + parts: objInfo.Parts, + partIndex: partStartIndex, + req: r, + customerKeyHeader: r.Header.Get(SSECustomerKey), + metadata: objInfo.UserDefined, + } + + w.buildDecrypter(partStartIndex + 1) + + return w, encStartOffset, encLength, nil +} + +// getEncryptedStartOffset - fetch sequence number, encrypted start offset and encrypted length. +func getEncryptedStartOffset(offset, length int64) (seqNumber uint32, encOffset int64, encLength int64) { + onePkgSize := int64(sseDAREPackageBlockSize + sseDAREPackageMetaSize) + + seqNumber = uint32(offset / sseDAREPackageBlockSize) + encOffset = int64(seqNumber) * onePkgSize + // The math to compute the encrypted length is always + // originalLength i.e (offset+length-1) to be divided under + // 64KiB blocks which is the payload size for each encrypted + // block. This is then multiplied by final package size which + // is basically 64KiB + 32. Finally negate the encrypted offset + // to get the final encrypted length on disk. + encLength = ((offset+length)/sseDAREPackageBlockSize)*onePkgSize - encOffset + + // Check for the remainder, to figure if we need one extract package to read from. + if (offset+length)%sseDAREPackageBlockSize > 0 { + encLength += onePkgSize + } + + return seqNumber, encOffset, encLength +} + +// IsEncryptedMultipart - is the encrypted content multiparted? +func (o *ObjectInfo) IsEncryptedMultipart() bool { + _, ok := o.UserDefined[ReservedMetadataPrefix+"Encrypted-Multipart"] + return ok } // IsEncrypted returns true if the object is marked as encrypted. @@ -438,6 +664,34 @@ func (o *ObjectInfo) IsEncrypted() bool { return false } +// IsEncrypted returns true if the object is marked as encrypted. +func (li *ListPartsInfo) IsEncrypted() bool { + if _, ok := li.UserDefined[ServerSideEncryptionIV]; ok { + return true + } + if _, ok := li.UserDefined[ServerSideEncryptionSealAlgorithm]; ok { + return true + } + if _, ok := li.UserDefined[ServerSideEncryptionSealedKey]; ok { + return true + } + return false +} + +func decryptedSize(encryptedSize int64) (int64, error) { + if encryptedSize == 0 { + return encryptedSize, nil + } + size := (encryptedSize / (sseDAREPackageBlockSize + sseDAREPackageMetaSize)) * sseDAREPackageBlockSize + if mod := encryptedSize % (sseDAREPackageBlockSize + sseDAREPackageMetaSize); mod > 0 { + if mod < sseDAREPackageMetaSize+1 { + return -1, errObjectTampered // object is not 0 size but smaller than the smallest valid encrypted object + } + size += mod - sseDAREPackageMetaSize + } + return size, nil +} + // DecryptedSize returns the size of the object after decryption in bytes. // It returns an error if the object is not encrypted or marked as encrypted // but has an invalid size. @@ -446,26 +700,17 @@ func (o *ObjectInfo) DecryptedSize() (int64, error) { if !o.IsEncrypted() { panic("cannot compute decrypted size of an object which is not encrypted") } - if o.Size == 0 { - return o.Size, nil - } - size := (o.Size / (32 + 64*1024)) * (64 * 1024) - if mod := o.Size % (32 + 64*1024); mod > 0 { - if mod < 33 { - return -1, errObjectTampered // object is not 0 size but smaller than the smallest valid encrypted object - } - size += mod - 32 - } - return size, nil + + return decryptedSize(o.Size) } // EncryptedSize returns the size of the object after encryption. // An encrypted object is always larger than a plain object // except for zero size objects. func (o *ObjectInfo) EncryptedSize() int64 { - size := (o.Size / (64 * 1024)) * (32 + 64*1024) - if mod := o.Size % (64 * 1024); mod > 0 { - size += mod + 32 + size := (o.Size / sseDAREPackageBlockSize) * (sseDAREPackageBlockSize + sseDAREPackageMetaSize) + if mod := o.Size % (sseDAREPackageBlockSize); mod > 0 { + size += mod + sseDAREPackageMetaSize } return size } diff --git a/cmd/encryption-v1_test.go b/cmd/encryption-v1_test.go index efe98111d..4c0669fc7 100644 --- a/cmd/encryption-v1_test.go +++ b/cmd/encryption-v1_test.go @@ -278,13 +278,13 @@ func TestParseSSECopyCustomerRequest(t *testing.T) { var encryptedSizeTests = []struct { size, encsize int64 }{ - {size: 0, encsize: 0}, // 0 - {size: 1, encsize: 33}, // 1 - {size: 1024, encsize: 1024 + 32}, // 2 - {size: 2 * 64 * 1024, encsize: 2 * (64*1024 + 32)}, // 3 - {size: 100*64*1024 + 1, encsize: 100*(64*1024+32) + 33}, // 4 - {size: 64*1024 + 1, encsize: (64*1024 + 32) + 33}, // 5 - {size: 5 * 1024 * 1024 * 1024, encsize: 81920 * (64*1024 + 32)}, // 6 + {size: 0, encsize: 0}, // 0 + {size: 1, encsize: 33}, // 1 + {size: 1024, encsize: 1024 + 32}, // 2 + {size: 2 * sseDAREPackageBlockSize, encsize: 2 * (sseDAREPackageBlockSize + 32)}, // 3 + {size: 100*sseDAREPackageBlockSize + 1, encsize: 100*(sseDAREPackageBlockSize+32) + 33}, // 4 + {size: sseDAREPackageBlockSize + 1, encsize: (sseDAREPackageBlockSize + 32) + 33}, // 5 + {size: 5 * 1024 * 1024 * 1024, encsize: 81920 * (sseDAREPackageBlockSize + 32)}, // 6 } func TestEncryptedSize(t *testing.T) { @@ -300,15 +300,15 @@ var decryptSSECustomerObjectInfoTests = []struct { encsize, size int64 err error }{ - {encsize: 0, size: 0, err: nil}, // 0 - {encsize: 33, size: 1, err: nil}, // 1 - {encsize: 1024 + 32, size: 1024, err: nil}, // 2 - {encsize: 2 * (64*1024 + 32), size: 2 * 64 * 1024, err: nil}, // 3 - {encsize: 100*(64*1024+32) + 33, size: 100*64*1024 + 1, err: nil}, // 4 - {encsize: (64*1024 + 32) + 33, size: 64*1024 + 1, err: nil}, // 5 - {encsize: 81920 * (64*1024 + 32), size: 5 * 1024 * 1024 * 1024, err: nil}, // 6 - {encsize: 0, size: 0, err: nil}, // 7 - {encsize: 64*1024 + 32 + 31, size: 0, err: errObjectTampered}, // 8 + {encsize: 0, size: 0, err: nil}, // 0 + {encsize: 33, size: 1, err: nil}, // 1 + {encsize: 1024 + 32, size: 1024, err: nil}, // 2 + {encsize: 2 * (sseDAREPackageBlockSize + 32), size: 2 * sseDAREPackageBlockSize, err: nil}, // 3 + {encsize: 100*(sseDAREPackageBlockSize+32) + 33, size: 100*sseDAREPackageBlockSize + 1, err: nil}, // 4 + {encsize: (sseDAREPackageBlockSize + 32) + 33, size: sseDAREPackageBlockSize + 1, err: nil}, // 5 + {encsize: 81920 * (sseDAREPackageBlockSize + 32), size: 5 * 1024 * 1024 * 1024, err: nil}, // 6 + {encsize: 0, size: 0, err: nil}, // 7 + {encsize: sseDAREPackageBlockSize + 32 + 31, size: 0, err: errObjectTampered}, // 8 } func TestDecryptedSize(t *testing.T) { diff --git a/cmd/fs-v1-metadata.go b/cmd/fs-v1-metadata.go index 55eb515f4..fcb1ed075 100644 --- a/cmd/fs-v1-metadata.go +++ b/cmd/fs-v1-metadata.go @@ -121,6 +121,9 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo // response headers. e.g, X-Minio-* or X-Amz-*. objInfo.UserDefined = cleanMetadata(m.Meta) + // All the parts per object. + objInfo.Parts = m.Parts + // Success.. return objInfo } @@ -158,6 +161,28 @@ func parseFSMetaMap(fsMetaBuf []byte) map[string]string { return metaMap } +func parseFSPartsArray(fsMetaBuf []byte) []objectPartInfo { + // Get xlMetaV1.Parts array + var partsArray []objectPartInfo + + partsArrayResult := gjson.GetBytes(fsMetaBuf, "parts") + partsArrayResult.ForEach(func(key, part gjson.Result) bool { + partJSON := part.String() + number := gjson.Get(partJSON, "number").Int() + name := gjson.Get(partJSON, "name").String() + etag := gjson.Get(partJSON, "etag").String() + size := gjson.Get(partJSON, "size").Int() + partsArray = append(partsArray, objectPartInfo{ + Number: int(number), + Name: name, + ETag: etag, + Size: size, + }) + return true + }) + return partsArray +} + func (m *fsMetaV1) ReadFrom(lk *lock.LockedFile) (n int64, err error) { var fsMetaBuf []byte fi, err := lk.Stat() @@ -186,6 +211,9 @@ func (m *fsMetaV1) ReadFrom(lk *lock.LockedFile) (n int64, err error) { return 0, errors.Trace(errCorruptedFormat) } + // obtain parts information + m.Parts = parseFSPartsArray(fsMetaBuf) + // obtain metadata. m.Meta = parseFSMetaMap(fsMetaBuf) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 073fdd903..e6aee8efc 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -20,7 +20,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "io" "io/ioutil" "os" pathutil "path" @@ -232,6 +231,7 @@ func (fs *FSObjects) NewMultipartUpload(bucket, object string, meta map[string]s if err = ioutil.WriteFile(pathJoin(uploadIDDir, fsMetaJSONFile), fsMetaBytes, 0644); err != nil { return "", errors.Trace(err) } + return uploadID, nil } @@ -246,30 +246,26 @@ func (fs *FSObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, } // Initialize pipe. - pipeReader, pipeWriter := io.Pipe() - go func() { - if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcInfo.ETag); gerr != nil { + if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, srcInfo.Writer, srcInfo.ETag); gerr != nil { + if gerr = srcInfo.Writer.Close(); gerr != nil { + errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject) + return + } + return + } + // Close writer explicitly signalling we wrote all data. + if gerr := srcInfo.Writer.Close(); gerr != nil { errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject) - pipeWriter.CloseWithError(gerr) return } - pipeWriter.Close() // Close writer explicitly signalling we wrote all data. }() - hashReader, err := hash.NewReader(pipeReader, length, "", "") - if err != nil { - return pi, toObjectErr(err, dstBucket, dstObject) - } - - partInfo, err := fs.PutObjectPart(dstBucket, dstObject, uploadID, partID, hashReader) + partInfo, err := fs.PutObjectPart(dstBucket, dstObject, uploadID, partID, srcInfo.Reader) if err != nil { return pi, toObjectErr(err, dstBucket, dstObject) } - // Explicitly close the reader. - pipeReader.Close() - return partInfo, nil } @@ -392,27 +388,28 @@ func (fs *FSObjects) ListObjectParts(bucket, object, uploadID string, partNumber if entry == fsMetaJSONFile { continue } - partNumber, etag1, err := fs.decodePartFile(entry) - if err != nil { - return result, toObjectErr(errors.Trace(err)) + partNumber, etag1, derr := fs.decodePartFile(entry) + if derr != nil { + return result, toObjectErr(errors.Trace(derr)) } etag2, ok := partsMap[partNumber] if !ok { partsMap[partNumber] = etag1 continue } - stat1, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag1))) - if err != nil { - return result, toObjectErr(errors.Trace(err)) + stat1, serr := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag1))) + if serr != nil { + return result, toObjectErr(errors.Trace(serr)) } - stat2, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag2))) - if err != nil { - return result, toObjectErr(errors.Trace(err)) + stat2, serr := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag2))) + if serr != nil { + return result, toObjectErr(errors.Trace(serr)) } if stat1.ModTime().After(stat2.ModTime()) { partsMap[partNumber] = etag1 } } + var parts []PartInfo for partNumber, etag := range partsMap { parts = append(parts, PartInfo{PartNumber: partNumber, ETag: etag}) @@ -444,13 +441,21 @@ func (fs *FSObjects) ListObjectParts(bucket, object, uploadID string, partNumber } } for i, part := range result.Parts { - stat, err := fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag))) + var stat os.FileInfo + stat, err = fsStatFile(pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag))) if err != nil { return result, toObjectErr(errors.Trace(err)) } result.Parts[i].LastModified = stat.ModTime() result.Parts[i].Size = stat.Size() } + + fsMetaBytes, err := ioutil.ReadFile(pathJoin(uploadIDDir, fsMetaJSONFile)) + if err != nil { + return result, errors.Trace(err) + } + + result.UserDefined = parseFSMetaMap(fsMetaBytes) return result, nil } @@ -492,6 +497,11 @@ func (fs *FSObjects) CompleteMultipartUpload(bucket string, object string, uploa partSize := int64(-1) // Used later to ensure that all parts sizes are same. + fsMeta := fsMetaV1{} + + // Allocate parts similar to incoming slice. + fsMeta.Parts = make([]objectPartInfo, len(parts)) + // Validate all parts and then commit to disk. for i, part := range parts { partPath := pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag)) @@ -506,6 +516,13 @@ func (fs *FSObjects) CompleteMultipartUpload(bucket string, object string, uploa if partSize == -1 { partSize = fi.Size() } + + fsMeta.Parts[i] = objectPartInfo{ + Number: part.PartNumber, + ETag: part.ETag, + Size: fi.Size(), + } + if i == len(parts)-1 { break } @@ -590,7 +607,6 @@ func (fs *FSObjects) CompleteMultipartUpload(bucket string, object string, uploa } defer metaFile.Close() - fsMeta := fsMetaV1{} // Read saved fs metadata for ongoing multipart. fsMetaBuf, err := ioutil.ReadFile(pathJoin(uploadIDDir, fsMetaJSONFile)) if err != nil { diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index ef9c8d611..c5ee9c2f8 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -497,6 +497,7 @@ func (fs *FSObjects) getObject(bucket, object string, offset int64, length int64 return toObjectErr(errors.Trace(InvalidETag{}), bucket, object) } } + // Read the object, doesn't exist returns an s3 compatible error. fsObjPath := pathJoin(fs.fsPath, bucket, object) reader, size, err := fsOpenFile(fsObjPath, offset) @@ -918,48 +919,8 @@ func (fs *FSObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKe if err = objectLock.GetRLock(globalListingTimeout); err != nil { return ObjectInfo{}, err } - - if hasSuffix(entry, slashSeparator) { - var fi os.FileInfo - fi, err = fsStatDir(pathJoin(fs.fsPath, bucket, entry)) - objectLock.RUnlock() - if err != nil { - return objInfo, err - } - // Success. - return ObjectInfo{ - // Object name needs to be full path. - Name: entry, - Bucket: bucket, - Size: fi.Size(), - ModTime: fi.ModTime(), - IsDir: fi.IsDir(), - }, nil - } - - var etag string - etag, err = fs.getObjectETag(bucket, entry) - objectLock.RUnlock() - if err != nil { - return ObjectInfo{}, err - } - - // Stat the file to get file size. - var fi os.FileInfo - fi, err = fsStatFile(pathJoin(fs.fsPath, bucket, entry)) - if err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, entry) - } - - // Success. - return ObjectInfo{ - Name: entry, - Bucket: bucket, - Size: fi.Size(), - ModTime: fi.ModTime(), - IsDir: fi.IsDir(), - ETag: etag, - }, nil + defer objectLock.RUnlock() + return fs.getObjectInfo(bucket, entry) } heal := false // true only for xl.ListObjectsHeal() diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 450883a8d..5197aeed5 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -101,6 +101,9 @@ type ObjectInfo struct { // User-Defined metadata UserDefined map[string]string + // List of individual parts, maximum size of upto 10,000 + Parts []objectPartInfo `json:"-"` + // Implements writer and reader used by CopyObject API Writer io.WriteCloser `json:"-"` Reader *hash.Reader `json:"-"` @@ -138,6 +141,9 @@ type ListPartsInfo struct { // List of all parts. Parts []PartInfo + // Any metadata set during InitMultipartUpload, including encryption headers. + UserDefined map[string]string + EncodingType string // Not supported yet. } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index c3006fd2a..c8b89b32c 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -17,6 +17,8 @@ package cmd import ( + "crypto/hmac" + "encoding/binary" "encoding/hex" "encoding/xml" "fmt" @@ -32,6 +34,8 @@ import ( "github.com/minio/minio/pkg/errors" "github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/ioutil" + sha256 "github.com/minio/sha256-simd" + "github.com/minio/sio" ) // supportedHeadGetReqParams - supported request parameters for GET and HEAD presigned request. @@ -163,13 +167,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req // additionally also skipping mod(offset)64KiB boundaries. writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length) - var sequenceNumber uint32 - sequenceNumber, startOffset, length = getStartOffset(startOffset, length) - if length > objInfo.EncryptedSize() { - length = objInfo.EncryptedSize() - } - - writer, err = DecryptRequestWithSequenceNumber(writer, r, sequenceNumber, objInfo.UserDefined) + writer, startOffset, length, err = DecryptBlocksRequest(writer, r, startOffset, length, objInfo) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -747,10 +745,25 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r } } - if IsSSECustomerRequest(r.Header) { // handle SSE-C requests - // SSE-C is not implemented for multipart operations yet - writeErrorResponse(w, ErrNotImplemented, r.URL) - return + var encMetadata = map[string]string{} + + if objectAPI.IsEncryptionSupported() { + if IsSSECustomerRequest(r.Header) { + key, err := ParseSSECustomerRequest(r) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + _, err = newEncryptMetadata(key, encMetadata) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + // Set this for multipart only operations, we need to differentiate during + // decryption if the file was actually multipart or not. + encMetadata[ReservedMetadataPrefix+"Encrypted-Multipart"] = "" + } } // Extract metadata that needs to be saved. @@ -761,6 +774,12 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r return } + // We need to preserve the encryption headers set in EncryptRequest, + // so we do not want to override them, copy them instead. + for k, v := range encMetadata { + metadata[k] = v + } + uploadID, err := objectAPI.NewMultipartUpload(bucket, object, metadata) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) @@ -791,12 +810,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return } - if IsSSECustomerRequest(r.Header) { // handle SSE-C requests - // SSE-C is not implemented for multipart operations yet - writeErrorResponse(w, ErrNotImplemented, r.URL) - return - } - // Copy source path. cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source")) if err != nil { @@ -832,6 +845,13 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return } + if objectAPI.IsEncryptionSupported() { + if apiErr, _ := DecryptCopyObjectInfo(&srcInfo, r.Header); apiErr != ErrNone { + writeErrorResponse(w, apiErr, r.URL) + return + } + } + // Get request range. var hrange *httpRange rangeHeader := r.Header.Get("x-amz-copy-source-range") @@ -864,8 +884,70 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return } - // Make sure to remove all metadata from source for for multipart operations. - srcInfo.UserDefined = nil + // Initialize pipe. + pipeReader, pipeWriter := io.Pipe() + + var writer io.WriteCloser = pipeWriter + var reader io.Reader = pipeReader + if objectAPI.IsEncryptionSupported() { + var li ListPartsInfo + li, err = objectAPI.ListObjectParts(dstBucket, dstObject, uploadID, 0, 1) + if err != nil { + pipeWriter.CloseWithError(err) + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + if li.IsEncrypted() { + if !IsSSECustomerRequest(r.Header) { + writeErrorResponse(w, ErrSSEMultipartEncrypted, r.URL) + return + } + var key []byte + key, err = ParseSSECustomerRequest(r) + if err != nil { + pipeWriter.CloseWithError(err) + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + // Calculating object encryption key + var objectEncryptionKey []byte + objectEncryptionKey, err = decryptObjectInfo(key, li.UserDefined) + if err != nil { + pipeWriter.CloseWithError(err) + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + reader, err = sio.EncryptReader(reader, sio.Config{Key: objectEncryptionKey}) + if err != nil { + pipeWriter.CloseWithError(err) + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + } + if IsSSECopyCustomerRequest(r.Header) { + // Response writer should be limited early on for decryption upto required length, + // additionally also skipping mod(offset)64KiB boundaries. + writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length) + + writer, startOffset, length, err = DecryptBlocksRequest(pipeWriter, r, startOffset, length, srcInfo) + if err != nil { + pipeWriter.CloseWithError(err) + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + } + } + + hashReader, err := hash.NewReader(reader, length, "", "") // do not try to verify encrypted content + if err != nil { + pipeWriter.CloseWithError(err) + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + srcInfo.Reader = hashReader + srcInfo.Writer = writer // Copy source object to destination, if source and destination // object is same then only metadata is updated. @@ -876,6 +958,9 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return } + // Close the pipe after successful operation. + pipeReader.Close() + response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified) encodedSuccessResponse := encodeResponse(response) @@ -908,12 +993,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } - if IsSSECustomerRequest(r.Header) { // handle SSE-C requests - // SSE-C is not implemented for multipart operations yet - writeErrorResponse(w, ErrNotImplemented, r.URL) - return - } - /// if Content-Length is unknown/missing, throw away size := r.ContentLength @@ -956,8 +1035,9 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http var ( md5hex = hex.EncodeToString(md5Bytes) sha256hex = "" - reader = r.Body + reader io.Reader ) + reader = r.Body switch rAuthType { default: @@ -1006,6 +1086,55 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } + if objectAPI.IsEncryptionSupported() { + var li ListPartsInfo + li, err = objectAPI.ListObjectParts(bucket, object, uploadID, 0, 1) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + if li.IsEncrypted() { + if !IsSSECustomerRequest(r.Header) { + writeErrorResponse(w, ErrSSEMultipartEncrypted, r.URL) + return + } + var key []byte + key, err = ParseSSECustomerRequest(r) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + // Calculating object encryption key + var objectEncryptionKey []byte + objectEncryptionKey, err = decryptObjectInfo(key, li.UserDefined) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + var partIDbin [4]byte + binary.LittleEndian.PutUint32(partIDbin[:], uint32(partID)) // marshal part ID + + mac := hmac.New(sha256.New, objectEncryptionKey) // derive part encryption key from part ID and object key + mac.Write(partIDbin[:]) + partEncryptionKey := mac.Sum(nil) + + reader, err = sio.EncryptReader(reader, sio.Config{Key: partEncryptionKey}) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + info := ObjectInfo{Size: size} + hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "") // do not try to verify encrypted content + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + } + } + partInfo, err := objectAPI.PutObjectPart(bucket, object, uploadID, partID, hashReader) if err != nil { // Verify if the underlying error is signature mismatch. diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 5b4df7db5..9e8056197 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -783,36 +783,20 @@ func (s *xlSets) CopyObjectPart(srcBucket, srcObject, destBucket, destObject str srcSet := s.getHashedSet(srcObject) destSet := s.getHashedSet(destObject) - // Initialize pipe to stream from source. - pipeReader, pipeWriter := io.Pipe() go func() { - if gerr := srcSet.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcInfo.ETag); gerr != nil { + if gerr := srcSet.GetObject(srcBucket, srcObject, startOffset, length, srcInfo.Writer, srcInfo.ETag); gerr != nil { + if gerr = srcInfo.Writer.Close(); gerr != nil { + errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject) + return + } + } + if gerr := srcInfo.Writer.Close(); gerr != nil { errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject) - pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject)) return } - - // Close writer explicitly signalling we wrote all data. - pipeWriter.Close() - return }() - hashReader, err := hash.NewReader(pipeReader, length, "", "") - if err != nil { - pipeReader.CloseWithError(err) - return partInfo, toObjectErr(errors.Trace(err), destBucket, destObject) - } - - partInfo, err = destSet.PutObjectPart(destBucket, destObject, uploadID, partID, hashReader) - if err != nil { - pipeReader.CloseWithError(err) - return partInfo, err - } - - // Close the pipe - pipeReader.Close() - - return partInfo, nil + return destSet.PutObjectPart(destBucket, destObject, uploadID, partID, srcInfo.Reader) } // PutObjectPart - writes part of an object to hashedSet based on the object name. diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 83c020dec..c33b5e1a3 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -312,6 +312,9 @@ func (m xlMetaV1) ToObjectInfo(bucket, object string) ObjectInfo { // response headers. e.g, X-Minio-* or X-Amz-*. objInfo.UserDefined = cleanMetadata(m.Meta) + // All the parts per object. + objInfo.Parts = m.Parts + // Success. return objInfo } @@ -388,16 +391,16 @@ func pickValidXLMeta(metaArr []xlMetaV1, modTime time.Time) (xmv xlMetaV1, e err var objMetadataOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound, errFileAccessDenied, errCorruptedFormat) // readXLMetaParts - returns the XL Metadata Parts from xl.json of one of the disks picked at random. -func (xl xlObjects) readXLMetaParts(bucket, object string) (xlMetaParts []objectPartInfo, err error) { +func (xl xlObjects) readXLMetaParts(bucket, object string) (xlMetaParts []objectPartInfo, xlMeta map[string]string, err error) { var ignoredErrs []error for _, disk := range xl.getLoadBalancedDisks() { if disk == nil { ignoredErrs = append(ignoredErrs, errDiskNotFound) continue } - xlMetaParts, err = readXLMetaParts(disk, bucket, object) + xlMetaParts, xlMeta, err = readXLMetaParts(disk, bucket, object) if err == nil { - return xlMetaParts, nil + return xlMetaParts, xlMeta, nil } // For any reason disk or bucket is not available continue // and read from other disks. @@ -406,12 +409,12 @@ func (xl xlObjects) readXLMetaParts(bucket, object string) (xlMetaParts []object continue } // Error is not ignored, return right here. - return nil, err + return nil, nil, err } // If all errors were ignored, reduce to maximal occurrence // based on the read quorum. readQuorum := len(xl.getDisks()) / 2 - return nil, reduceReadQuorumErrs(ignoredErrs, nil, readQuorum) + return nil, nil, reduceReadQuorumErrs(ignoredErrs, nil, readQuorum) } // readXLMetaStat - return xlMetaV1.Stat and xlMetaV1.Meta from one of the disks picked at random. diff --git a/cmd/xl-v1-metadata_test.go b/cmd/xl-v1-metadata_test.go index beadfafd3..12d1e1604 100644 --- a/cmd/xl-v1-metadata_test.go +++ b/cmd/xl-v1-metadata_test.go @@ -159,7 +159,7 @@ func testXLReadMetaParts(obj ObjectLayer, instanceType string, disks []string, t uploadIDPath := path.Join(bucketNames[0], objectNames[0], uploadIDs[0]) - _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) + _, _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) if err != nil { t.Fatal(err) } @@ -168,7 +168,7 @@ func testXLReadMetaParts(obj ObjectLayer, instanceType string, disks []string, t removeDiskN(disks, 7) // Removing disk shouldn't affect reading object parts info. - _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) + _, _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) if err != nil { t.Fatal(err) } @@ -178,7 +178,7 @@ func testXLReadMetaParts(obj ObjectLayer, instanceType string, disks []string, t os.RemoveAll(path.Join(disk, minioMetaMultipartBucket, bucketNames[0])) } - _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) + _, _, err = obj.(*xlObjects).readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) if errors2.Cause(err) != errFileNotFound { t.Fatal(err) } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index be95ccf13..1f87df46e 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -19,7 +19,6 @@ package cmd import ( "encoding/hex" "fmt" - "io" "path" "strings" "sync" @@ -600,31 +599,25 @@ func (xl xlObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, u return pi, err } - // Initialize pipe. - pipeReader, pipeWriter := io.Pipe() - go func() { - if gerr := xl.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcInfo.ETag); gerr != nil { + if gerr := xl.getObject(srcBucket, srcObject, startOffset, length, srcInfo.Writer, srcInfo.ETag); gerr != nil { + if gerr = srcInfo.Writer.Close(); gerr != nil { + errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject) + } + return + } + // Close writer explicitly signalling we wrote all data. + if gerr := srcInfo.Writer.Close(); gerr != nil { errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject) - pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject)) return } - pipeWriter.Close() // Close writer explicitly signalling we wrote all data. }() - hashReader, err := hash.NewReader(pipeReader, length, "", "") + partInfo, err := xl.PutObjectPart(dstBucket, dstObject, uploadID, partID, srcInfo.Reader) if err != nil { return pi, toObjectErr(err, dstBucket, dstObject) } - partInfo, err := xl.PutObjectPart(dstBucket, dstObject, uploadID, partID, hashReader) - if err != nil { - return pi, toObjectErr(err, dstBucket, dstObject) - } - - // Explicitly close the reader. - pipeReader.Close() - // Success. return partInfo, nil } @@ -817,7 +810,7 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM uploadIDPath := path.Join(bucket, object, uploadID) - xlParts, err := xl.readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) + xlParts, xlMeta, err := xl.readXLMetaParts(minioMetaMultipartBucket, uploadIDPath) if err != nil { return lpi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } @@ -828,6 +821,7 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM result.UploadID = uploadID result.MaxParts = maxParts result.PartNumberMarker = partNumberMarker + result.UserDefined = xlMeta // For empty number of parts or maxParts as zero, return right here. if len(xlParts) == 0 || maxParts == 0 { @@ -1111,20 +1105,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return oi, toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object)) } - objInfo := ObjectInfo{ - IsDir: false, - Bucket: bucket, - Name: object, - Size: xlMeta.Stat.Size, - ModTime: xlMeta.Stat.ModTime, - ETag: xlMeta.Meta["etag"], - ContentType: xlMeta.Meta["content-type"], - ContentEncoding: xlMeta.Meta["content-encoding"], - UserDefined: xlMeta.Meta, - } - // Success, return object info. - return objInfo, nil + return xlMeta.ToObjectInfo(bucket, object), nil } // Wrapper which removes all the uploaded parts. diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index a3e148e1d..e8ec39288 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -395,26 +395,7 @@ func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, er return objInfo, err } - objInfo = ObjectInfo{ - IsDir: false, - Bucket: bucket, - Name: object, - Size: xlMeta.Stat.Size, - ModTime: xlMeta.Stat.ModTime, - ContentType: xlMeta.Meta["content-type"], - ContentEncoding: xlMeta.Meta["content-encoding"], - } - - // Extract etag. - objInfo.ETag = extractETag(xlMeta.Meta) - - // etag/md5Sum has already been extracted. We need to - // remove to avoid it from appearing as part of - // response headers. e.g, X-Minio-* or X-Amz-*. - objInfo.UserDefined = cleanMetadata(xlMeta.Meta) - - // Success. - return objInfo, nil + return xlMeta.ToObjectInfo(bucket, object), nil } func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, errs []error) { diff --git a/cmd/xl-v1-utils.go b/cmd/xl-v1-utils.go index 92b164d00..565948276 100644 --- a/cmd/xl-v1-utils.go +++ b/cmd/xl-v1-utils.go @@ -244,16 +244,18 @@ func xlMetaV1UnmarshalJSON(xlMetaBuf []byte) (xlMeta xlMetaV1, e error) { } // read xl.json from the given disk, parse and return xlV1MetaV1.Parts. -func readXLMetaParts(disk StorageAPI, bucket string, object string) ([]objectPartInfo, error) { +func readXLMetaParts(disk StorageAPI, bucket string, object string) ([]objectPartInfo, map[string]string, error) { // Reads entire `xl.json`. xlMetaBuf, err := disk.ReadAll(bucket, path.Join(object, xlMetaJSONFile)) if err != nil { - return nil, errors2.Trace(err) + return nil, nil, errors2.Trace(err) } + // obtain xlMetaV1{}.Partsusing `github.com/tidwall/gjson`. xlMetaParts := parseXLParts(xlMetaBuf) + xlMetaMap := parseXLMetaMap(xlMetaBuf) - return xlMetaParts, nil + return xlMetaParts, xlMetaMap, nil } // read xl.json from the given disk and parse xlV1Meta.Stat and xlV1Meta.Meta using gjson.