diff --git a/cmd/api-headers.go b/cmd/api-headers.go index c382a2fd7..1ed996d86 100644 --- a/cmd/api-headers.go +++ b/cmd/api-headers.go @@ -61,7 +61,7 @@ func encodeResponseJSON(response interface{}) []byte { } // Write object header -func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSpec) { +func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, contentRange *httpRange) { // set common headers setCommonHeaders(w) @@ -96,9 +96,9 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp } // for providing ranged content - if rs != nil { + if contentRange != nil && contentRange.offsetBegin > -1 { // Override content-length - w.Header().Set("Content-Length", strconv.FormatInt(rs.GetLength(objInfo.Size), 10)) - w.Header().Set("Content-Range", rs.ContentRangeString(objInfo.Size)) + w.Header().Set("Content-Length", strconv.FormatInt(contentRange.getLength(), 10)) + w.Header().Set("Content-Range", contentRange.String()) } } diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 650f487bf..f44ef7375 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -57,7 +57,6 @@ type cacheObjects struct { // file path patterns to exclude from cache exclude []string // Object functions pointing to the corresponding functions of backend implementation. - GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) GetObjectFn func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) GetObjectInfoFn func(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) PutObjectFn func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) @@ -89,7 +88,6 @@ type CacheObjectLayer interface { ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) DeleteBucket(ctx context.Context, bucket string) error // Object operations. - GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) GetObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) @@ -177,75 +175,6 @@ func (c cacheObjects) getMetadata(objInfo ObjectInfo) map[string]string { return metadata } -func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, r io.ReadCloser, err error) { - - bkObjInfo, bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs) - - if c.isCacheExclude(bucket, object) { - return bkObjInfo, bkReader, bkErr - } - - // fetch cacheFSObjects if object is currently cached or nearest available cache drive - dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object) - if err != nil { - return bkObjInfo, bkReader, bkErr - } - - backendDown := backendDownError(bkErr) - if bkErr != nil && !backendDown { - if _, ok := err.(ObjectNotFound); ok { - // Delete the cached entry if backend object was deleted. - dcache.Delete(ctx, bucket, object) - } - return oi, r, bkErr - } - - if !backendDown && filterFromCache(bkObjInfo.UserDefined) { - return bkObjInfo, bkReader, bkErr - } - - cacheObjInfo, cacheReader, cacheErr := dcache.GetObjectNInfo(ctx, bucket, object, rs) - if cacheErr == nil { - if backendDown { - // If the backend is down, serve the request from cache. - return cacheObjInfo, cacheReader, nil - } - if cacheObjInfo.ETag == bkObjInfo.ETag && !isStaleCache(bkObjInfo) { - return cacheObjInfo, cacheReader, nil - } - dcache.Delete(ctx, bucket, object) - } - - if rs != nil { - // We don't cache partial objects. - return bkObjInfo, bkReader, bkErr - } - if !dcache.diskAvailable(bkObjInfo.Size * cacheSizeMultiplier) { - // cache only objects < 1/100th of disk capacity - return bkObjInfo, bkReader, bkErr - } - - // Initialize pipe. - pipeReader, pipeWriter := io.Pipe() - teeReader := io.TeeReader(bkReader, pipeWriter) - hashReader, herr := hash.NewReader(pipeReader, bkObjInfo.Size, "", "") - if err != nil { - return oi, r, herr - } - - cleanupBackend := func() { bkReader.Close() } - getObjReader := NewGetObjectReader(teeReader, nil, cleanupBackend) - - go func() { - putErr := dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(bkObjInfo)) - // close the write end of the pipe, so the error gets - // propagated to getObjReader - pipeWriter.CloseWithError(putErr) - }() - - return bkObjInfo, getObjReader, nil -} - // Uses cached-object to serve the request. If object is not cached it serves the request from the backend and also // stores it in the cache for serving subsequent requests. func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) { diff --git a/cmd/dummy-object-layer_test.go b/cmd/dummy-object-layer_test.go index 192866cff..0c0a512da 100644 --- a/cmd/dummy-object-layer_test.go +++ b/cmd/dummy-object-layer_test.go @@ -59,10 +59,6 @@ func (api *DummyObjectLayer) ListObjectsV2(ctx context.Context, bucket, prefix, return } -func (api *DummyObjectLayer) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { - return -} - func (api *DummyObjectLayer) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) { return } diff --git a/cmd/encryption-v1.go b/cmd/encryption-v1.go index 94223dc6a..f822d6d50 100644 --- a/cmd/encryption-v1.go +++ b/cmd/encryption-v1.go @@ -312,155 +312,6 @@ func newDecryptWriterWithObjectKey(client io.Writer, objectEncryptionKey []byte, return writer, nil } -// Adding support for reader based interface - -// DecryptRequestWithSequenceNumberR - same as -// DecryptRequestWithSequenceNumber but with a reader -func DecryptRequestWithSequenceNumberR(client io.Reader, r *http.Request, bucket, object string, seqNumber uint32, metadata map[string]string) (io.Reader, error) { - if crypto.S3.IsEncrypted(metadata) { - return newDecryptReader(client, nil, bucket, object, seqNumber, metadata) - } - - key, err := ParseSSECustomerRequest(r) - if err != nil { - return nil, err - } - delete(metadata, crypto.SSECKey) // make sure we do not save the key by accident - return newDecryptReader(client, key, bucket, object, seqNumber, metadata) -} - -// DecryptCopyRequestR - same as DecryptCopyRequest, but with a -// Reader -func DecryptCopyRequestR(client io.Reader, r *http.Request, bucket, object string, metadata map[string]string) (io.Reader, error) { - var ( - key []byte - err error - ) - if crypto.SSECopy.IsRequested(r.Header) { - key, err = ParseSSECopyCustomerRequest(r, metadata) - if err != nil { - return nil, err - } - } - delete(metadata, crypto.SSECopyKey) // make sure we do not save the key by accident - return newDecryptReader(client, key, bucket, object, 0, metadata) -} - -func newDecryptReader(client io.Reader, key []byte, bucket, object string, seqNumber uint32, metadata map[string]string) (io.Reader, error) { - objectEncryptionKey, err := decryptObjectInfo(key, bucket, object, metadata) - if err != nil { - return nil, err - } - return newDecryptReaderWithObjectKey(client, objectEncryptionKey, seqNumber, metadata) -} - -func newDecryptReaderWithObjectKey(client io.Reader, objectEncryptionKey []byte, seqNumber uint32, metadata map[string]string) (io.Reader, error) { - reader, err := sio.DecryptReader(client, sio.Config{ - Key: objectEncryptionKey, - SequenceNumber: seqNumber, - }) - if err != nil { - return nil, crypto.ErrInvalidCustomerKey - } - delete(metadata, crypto.SSEIV) - delete(metadata, crypto.SSESealAlgorithm) - delete(metadata, crypto.SSECSealedKey) - delete(metadata, crypto.SSEMultipart) - delete(metadata, crypto.S3SealedKey) - delete(metadata, crypto.S3KMSSealedKey) - delete(metadata, crypto.S3KMSKeyID) - return reader, nil -} - -// DecryptBlocksRequestR - same as DecryptBlocksRequest but with a -// reader -func DecryptBlocksRequestR(client io.Reader, r *http.Request, bucket, object string, startOffset, length int64, objInfo ObjectInfo, copySource bool) (io.Reader, int64, int64, error) { - var seqNumber uint32 - var encStartOffset, encLength int64 - - if len(objInfo.Parts) == 0 || !crypto.IsMultiPart(objInfo.UserDefined) { - seqNumber, encStartOffset, encLength = getEncryptedSinglePartOffsetLength(startOffset, length, objInfo) - - var reader io.Reader - var err error - if copySource { - reader, err = DecryptCopyRequestR(client, r, bucket, object, objInfo.UserDefined) - } else { - reader, err = DecryptRequestWithSequenceNumberR(client, r, bucket, object, seqNumber, objInfo.UserDefined) - } - if err != nil { - return nil, 0, 0, err - } - return reader, encStartOffset, encLength, nil - } - - seqNumber, encStartOffset, encLength = getEncryptedMultipartsOffsetLength(startOffset, length, objInfo) - var partStartIndex int - var partStartOffset = startOffset - // Skip parts until final offset maps to a particular part offset. - for i, part := range objInfo.Parts { - decryptedSize, err := sio.DecryptedSize(uint64(part.Size)) - if err != nil { - return nil, -1, -1, errObjectTampered - } - - partStartIndex = i - - // Offset is smaller than size we have reached the - // proper part offset, break out we start from - // this part index. - if partStartOffset < int64(decryptedSize) { - break - } - - // Continue to look for next part. - partStartOffset -= int64(decryptedSize) - } - - startSeqNum := partStartOffset / sseDAREPackageBlockSize - partEncRelOffset := int64(startSeqNum) * (sseDAREPackageBlockSize + sseDAREPackageMetaSize) - - w := &DecryptBlocksReader{ - reader: client, - startSeqNum: uint32(startSeqNum), - partEncRelOffset: partEncRelOffset, - parts: objInfo.Parts, - partIndex: partStartIndex, - req: r, - bucket: bucket, - object: object, - customerKeyHeader: r.Header.Get(crypto.SSECKey), - copySource: copySource, - } - - w.metadata = map[string]string{} - // Copy encryption metadata for internal use. - for k, v := range objInfo.UserDefined { - w.metadata[k] = v - } - - // Purge all the encryption headers. - delete(objInfo.UserDefined, crypto.SSEIV) - delete(objInfo.UserDefined, crypto.SSESealAlgorithm) - delete(objInfo.UserDefined, crypto.SSECSealedKey) - delete(objInfo.UserDefined, crypto.SSEMultipart) - - if crypto.S3.IsEncrypted(objInfo.UserDefined) { - delete(objInfo.UserDefined, crypto.S3SealedKey) - delete(objInfo.UserDefined, crypto.S3KMSKeyID) - delete(objInfo.UserDefined, crypto.S3KMSSealedKey) - } - if w.copySource { - w.customerKeyHeader = r.Header.Get(crypto.SSECopyKey) - } - - if err := w.buildDecrypter(w.parts[w.partIndex].Number); err != nil { - return nil, 0, 0, err - } - - return w, encStartOffset, encLength, nil -} - // DecryptRequestWithSequenceNumber decrypts the object with the client provided key. It also removes // the client-side-encryption metadata from the object and sets the correct headers. func DecryptRequestWithSequenceNumber(client io.Writer, r *http.Request, bucket, object string, seqNumber uint32, metadata map[string]string) (io.WriteCloser, error) { @@ -482,123 +333,6 @@ func DecryptRequest(client io.Writer, r *http.Request, bucket, object string, me return DecryptRequestWithSequenceNumber(client, r, bucket, object, 0, metadata) } -// DecryptBlocksReader - decrypts multipart parts, while implementing -// a io.Reader compatible interface. -type DecryptBlocksReader struct { - // Source of the encrypted content that will be decrypted - reader io.Reader - // Current decrypter for the current encrypted data block - decrypter io.Reader - // Start sequence number - startSeqNum uint32 - // Current part index - partIndex int - // Parts information - parts []objectPartInfo - req *http.Request - bucket, object string - metadata map[string]string - - partEncRelOffset int64 - - copySource bool - // Customer Key - customerKeyHeader string -} - -func (d *DecryptBlocksReader) buildDecrypter(partID int) error { - m := make(map[string]string) - for k, v := range d.metadata { - m[k] = v - } - // Initialize the first decrypter; new decrypters will be - // initialized in Read() operation as needed. - var key []byte - var err error - if d.copySource { - if crypto.SSEC.IsEncrypted(d.metadata) { - d.req.Header.Set(crypto.SSECopyKey, d.customerKeyHeader) - key, err = ParseSSECopyCustomerRequest(d.req, d.metadata) - } - } else { - if crypto.SSEC.IsEncrypted(d.metadata) { - d.req.Header.Set(crypto.SSECKey, d.customerKeyHeader) - key, err = ParseSSECustomerRequest(d.req) - } - } - if err != nil { - return err - } - - objectEncryptionKey, err := decryptObjectInfo(key, d.bucket, d.object, m) - if err != nil { - return err - } - - var partIDbin [4]byte - binary.LittleEndian.PutUint32(partIDbin[:], uint32(partID)) // marshal part ID - - mac := hmac.New(sha256.New, objectEncryptionKey) // derive part encryption key from part ID and object key - mac.Write(partIDbin[:]) - partEncryptionKey := mac.Sum(nil) - - // make sure we do not save the key by accident - if d.copySource { - delete(m, crypto.SSECopyKey) - } else { - delete(m, crypto.SSECKey) - } - - // make sure to provide a NopCloser such that a Close - // on sio.decryptWriter doesn't close the underlying writer's - // close which perhaps can close the stream prematurely. - decrypter, err := newDecryptReaderWithObjectKey(d.reader, partEncryptionKey, d.startSeqNum, m) - if err != nil { - return err - } - - d.decrypter = decrypter - return nil -} - -func (d *DecryptBlocksReader) Read(p []byte) (int, error) { - var err error - var n1 int - if int64(len(p)) < d.parts[d.partIndex].Size-d.partEncRelOffset { - n1, err = d.decrypter.Read(p) - if err != nil { - return 0, err - } - d.partEncRelOffset += int64(n1) - } else { - n1, err = d.decrypter.Read(p[:d.parts[d.partIndex].Size-d.partEncRelOffset]) - if err != nil { - return 0, err - } - - // We should now proceed to next part, reset all - // values appropriately. - d.partEncRelOffset = 0 - d.startSeqNum = 0 - - d.partIndex++ - - err = d.buildDecrypter(d.partIndex + 1) - if err != nil { - return 0, err - } - - n1, err = d.decrypter.Read(p[n1:]) - if err != nil { - return 0, err - } - - d.partEncRelOffset += int64(n1) - } - - return len(p), nil -} - // DecryptBlocksWriter - decrypts multipart parts, while implementing a io.Writer compatible interface. type DecryptBlocksWriter struct { // Original writer where the plain data will be written diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index fb25aec04..d7ea10b59 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -17,7 +17,6 @@ package cmd import ( - "bytes" "context" "encoding/hex" "io" @@ -499,96 +498,6 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu return objInfo, nil } -// GetObjectNInfo - returns object info and a reader for object -// content. -func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { - - if err = checkGetObjArgs(ctx, bucket, object); err != nil { - return objInfo, reader, err - } - - if _, err = fs.statBucketDir(ctx, bucket); err != nil { - return objInfo, reader, toObjectErr(err, bucket) - } - - // Lock the object before reading. - lock := fs.nsMutex.NewNSLock(bucket, object) - if err = lock.GetRLock(globalObjectTimeout); err != nil { - logger.LogIf(ctx, err) - return objInfo, reader, err - } - - // For a directory, we need to send an empty body. - if hasSuffix(object, slashSeparator) { - // The lock taken above is released when - // objReader.Close() is called by the caller. - objReader := NewGetObjectReader(bytes.NewBuffer(nil), lock, nil) - return objInfo, objReader, nil - } - - // Otherwise we get the object info - objInfo, err = fs.getObjectInfo(ctx, bucket, object) - err = toObjectErr(err, bucket, object) - if err != nil { - lock.RUnlock() - return objInfo, nil, err - } - - // Take a rwPool lock for NFS gateway type deployment - var cleanUp func() - if bucket != minioMetaBucket { - fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile) - _, err = fs.rwPool.Open(fsMetaPath) - if err != nil && err != errFileNotFound { - logger.LogIf(ctx, err) - lock.RUnlock() - return objInfo, nil, toObjectErr(err, bucket, object) - } - cleanUp = func() { - // Need to clean up lock after getObject is - // completed. - fs.rwPool.Close(fsMetaPath) - } - } - - offset, length := int64(0), objInfo.Size - if rs != nil { - offset, length = rs.GetOffsetLength(objInfo.Size) - } - - // Read the object, doesn't exist returns an s3 compatible error. - fsObjPath := pathJoin(fs.fsPath, bucket, object) - reader, size, err := fsOpenFile(ctx, fsObjPath, offset) - if err != nil { - lock.RUnlock() - cleanUp() - return objInfo, nil, toObjectErr(err, bucket, object) - } - - bufSize := int64(readSizeV1) - if length > 0 && bufSize > length { - bufSize = length - } - - // For negative length we read everything. - if length < 0 { - length = size - offset - } - - // Reply back invalid range if the input offset and length - // fall out of range. - if offset > size || offset+length > size { - err = InvalidRange{offset, length, size} - logger.LogIf(ctx, err) - lock.RUnlock() - cleanUp() - return objInfo, nil, err - } - - objReader := NewGetObjectReader(io.LimitReader(reader, length), lock, cleanUp) - return objInfo, objReader, nil -} - // GetObject - reads an object from the disk. // Supports additional parameters like offset and length // which are synonymous with HTTP Range requests. diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index 8f12ee669..c52c11699 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -615,27 +615,6 @@ func (a *azureObjects) ListObjectsV2(ctx context.Context, bucket, prefix, contin return result, nil } -func (a *azureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { - objInfo, err = a.GetObjectInfo(ctx, bucket, object) - if err != nil { - return objInfo, reader, err - } - - startOffset, length := int64(0), objInfo.Size - if rs != nil { - startOffset, length = rs.GetOffsetLength(objInfo.Size) - } - - pr, pw := io.Pipe() - objReader := minio.NewGetObjectReader(pr, nil, nil) - go func() { - err := a.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) - pw.CloseWithError(err) - }() - - return objInfo, objReader, nil -} - // GetObject - reads an object from azure. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. diff --git a/cmd/gateway/b2/gateway-b2.go b/cmd/gateway/b2/gateway-b2.go index ecd0d7455..4c3366f99 100644 --- a/cmd/gateway/b2/gateway-b2.go +++ b/cmd/gateway/b2/gateway-b2.go @@ -394,27 +394,6 @@ func (l *b2Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat return loi, nil } -func (l *b2Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { - objInfo, err = l.GetObjectInfo(ctx, bucket, object) - if err != nil { - return objInfo, reader, err - } - - startOffset, length := int64(0), objInfo.Size - if rs != nil { - startOffset, length = rs.GetOffsetLength(objInfo.Size) - } - - pr, pw := io.Pipe() - objReader := minio.NewGetObjectReader(pr, nil, nil) - go func() { - err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) - pw.CloseWithError(err) - }() - - return objInfo, objReader, nil -} - // GetObject reads an object from B2. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index 23924c337..cd1135a2b 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -732,27 +732,6 @@ func (l *gcsGateway) ListObjectsV2(ctx context.Context, bucket, prefix, continua }, nil } -func (l *gcsGateway) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { - objInfo, err = l.GetObjectInfo(ctx, bucket, object) - if err != nil { - return objInfo, reader, err - } - - startOffset, length := int64(0), objInfo.Size - if rs != nil { - startOffset, length = rs.GetOffsetLength(objInfo.Size) - } - - pr, pw := io.Pipe() - objReader := minio.NewGetObjectReader(pr, nil, nil) - go func() { - err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) - pw.CloseWithError(err) - }() - - return objInfo, objReader, nil -} - // GetObject - reads an object from GCS. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. diff --git a/cmd/gateway/manta/gateway-manta.go b/cmd/gateway/manta/gateway-manta.go index d0fd21d62..16f51f0a6 100644 --- a/cmd/gateway/manta/gateway-manta.go +++ b/cmd/gateway/manta/gateway-manta.go @@ -497,27 +497,6 @@ func (t *tritonObjects) ListObjectsV2(ctx context.Context, bucket, prefix, conti return result, nil } -func (t *tritonObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { - objInfo, err = t.GetObjectInfo(ctx, bucket, object) - if err != nil { - return objInfo, reader, err - } - - startOffset, length := int64(0), objInfo.Size - if rs != nil { - startOffset, length = rs.GetOffsetLength(objInfo.Size) - } - - pr, pw := io.Pipe() - objReader := minio.NewGetObjectReader(pr, nil, nil) - go func() { - err := t.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) - pw.CloseWithError(err) - }() - - return objInfo, objReader, nil -} - // GetObject - Reads an object from Manta. Supports additional parameters like // offset and length which are synonymous with HTTP Range requests. // diff --git a/cmd/gateway/oss/gateway-oss.go b/cmd/gateway/oss/gateway-oss.go index a8c7d5b17..1423d6b7a 100644 --- a/cmd/gateway/oss/gateway-oss.go +++ b/cmd/gateway/oss/gateway-oss.go @@ -69,7 +69,7 @@ ENVIRONMENT VARIABLES: DOMAIN: MINIO_DOMAIN: To enable virtual-host-style requests, set this value to Minio host domain name. - + CACHE: MINIO_CACHE_DRIVES: List of mounted drives or directories delimited by ";". MINIO_CACHE_EXCLUDE: List of cache exclusion patterns delimited by ";". @@ -547,27 +547,6 @@ func ossGetObject(ctx context.Context, client *oss.Client, bucket, key string, s return nil } -func (l *ossObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { - objInfo, err = l.GetObjectInfo(ctx, bucket, object) - if err != nil { - return objInfo, reader, err - } - - startOffset, length := int64(0), objInfo.Size - if rs != nil { - startOffset, length = rs.GetOffsetLength(objInfo.Size) - } - - pr, pw := io.Pipe() - objReader := minio.NewGetObjectReader(pr, nil, nil) - go func() { - err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) - pw.CloseWithError(err) - }() - - return objInfo, objReader, nil -} - // GetObject reads an object on OSS. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 3569b0266..851e3a07d 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -301,27 +301,6 @@ func (l *s3Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat return minio.FromMinioClientListBucketV2Result(bucket, result), nil } -func (l *s3Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { - objInfo, err = l.GetObjectInfo(ctx, bucket, object) - if err != nil { - return objInfo, reader, err - } - - startOffset, length := int64(0), objInfo.Size - if rs != nil { - startOffset, length = rs.GetOffsetLength(objInfo.Size) - } - - pr, pw := io.Pipe() - objReader := minio.NewGetObjectReader(pr, nil, nil) - go func() { - err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) - pw.CloseWithError(err) - }() - - return objInfo, objReader, nil -} - // GetObject reads an object from S3. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. @@ -334,9 +313,6 @@ func (l *s3Objects) GetObject(ctx context.Context, bucket string, key string, st } opts := miniogo.GetObjectOptions{} - if etag != "" { - opts.SetMatchETag(etag) - } if startOffset >= 0 && length >= 0 { if err := opts.SetRange(startOffset, startOffset+length-1); err != nil { logger.LogIf(ctx, err) diff --git a/cmd/gateway/sia/gateway-sia.go b/cmd/gateway/sia/gateway-sia.go index 33bb84634..f2d2f49dd 100644 --- a/cmd/gateway/sia/gateway-sia.go +++ b/cmd/gateway/sia/gateway-sia.go @@ -431,27 +431,6 @@ func (s *siaObjects) ListObjects(ctx context.Context, bucket string, prefix stri return loi, nil } -func (s *siaObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { - objInfo, err = s.GetObjectInfo(ctx, bucket, object) - if err != nil { - return objInfo, reader, err - } - - startOffset, length := int64(0), objInfo.Size - if rs != nil { - startOffset, length = rs.GetOffsetLength(objInfo.Size) - } - - pr, pw := io.Pipe() - objReader := minio.NewGetObjectReader(pr, nil, nil) - go func() { - err := s.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) - pw.CloseWithError(err) - }() - - return objInfo, objReader, nil -} - func (s *siaObjects) GetObject(ctx context.Context, bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string) error { dstFile := path.Join(s.TempDir, minio.MustGetUUID()) defer os.Remove(dstFile) diff --git a/cmd/httprange.go b/cmd/httprange.go index 77eaaad76..bed49eba8 100644 --- a/cmd/httprange.go +++ b/cmd/httprange.go @@ -133,108 +133,3 @@ func parseRequestRange(rangeString string, resourceSize int64) (hrange *httpRang return &httpRange{offsetBegin, offsetEnd, resourceSize}, nil } - -// HTTPRangeSpec represents a range specification as supported by S3 GET -// object request. -// -// Case 1: Not present -> represented by a nil RangeSpec -// Case 2: bytes=1-10 (absolute start and end offsets) -> RangeSpec{false, 1, 10} -// Case 3: bytes=10- (absolute start offset with end offset unspecified) -> RangeSpec{false, 10, -1} -// Case 4: bytes=-30 (suffix length specification) -> RangeSpec{true, -30, -1} -type HTTPRangeSpec struct { - // Does the range spec refer to a suffix of the object? - IsSuffixLength bool - - // Start and end offset specified in range spec - Start, End int64 -} - -// ContentRangeString populate range stringer interface -func (h *HTTPRangeSpec) ContentRangeString(resourceSize int64) string { - start, rangeLength := h.GetOffsetLength(resourceSize) - return fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLength-1, resourceSize) -} - -// GetLength - get length of range -func (h *HTTPRangeSpec) GetLength(resourceSize int64) int64 { - switch { - case h.IsSuffixLength: - specifiedLen := -h.Start - if specifiedLen > resourceSize { - specifiedLen = resourceSize - } - return specifiedLen - case h.End > -1: - end := h.End - if resourceSize < end { - end = resourceSize - 1 - } - return end - h.Start + 1 - default: - return resourceSize - h.Start - } -} - -// GetOffsetLength computes the start offset and length of the range -// given the size of the resource -func (h *HTTPRangeSpec) GetOffsetLength(resourceSize int64) (start int64, length int64) { - length = h.GetLength(resourceSize) - start = h.Start - if h.IsSuffixLength { - start = resourceSize + h.Start - } - return -} - -// Parses a range header value into a HTTPRangeSpec -func parseRequestRangeSpec(rangeString string) (hrange *HTTPRangeSpec, err error) { - // Return error if given range string doesn't start with byte range prefix. - if !strings.HasPrefix(rangeString, byteRangePrefix) { - return nil, fmt.Errorf("'%s' does not start with '%s'", rangeString, byteRangePrefix) - } - - // Trim byte range prefix. - byteRangeString := strings.TrimPrefix(rangeString, byteRangePrefix) - - // Check if range string contains delimiter '-', else return error. eg. "bytes=8" - sepIndex := strings.Index(byteRangeString, "-") - if sepIndex == -1 { - return nil, fmt.Errorf("'%s' does not have a valid range value", rangeString) - } - - offsetBeginString := byteRangeString[:sepIndex] - offsetBegin := int64(-1) - // Convert offsetBeginString only if its not empty. - if len(offsetBeginString) > 0 { - if offsetBegin, err = strconv.ParseInt(offsetBeginString, 10, 64); err != nil { - return nil, fmt.Errorf("'%s' does not have a valid first byte position value", rangeString) - } - } - - offsetEndString := byteRangeString[sepIndex+1:] - offsetEnd := int64(-1) - // Convert offsetEndString only if its not empty. - if len(offsetEndString) > 0 { - if offsetEnd, err = strconv.ParseInt(offsetEndString, 10, 64); err != nil { - return nil, fmt.Errorf("'%s' does not have a valid last byte position value", rangeString) - } - } - - switch { - case offsetBegin > -1 && offsetEnd > -1: - if offsetBegin > offsetEnd { - return nil, errInvalidRange - } - return &HTTPRangeSpec{false, offsetBegin, offsetEnd}, nil - case offsetBegin > -1: - return &HTTPRangeSpec{false, offsetBegin, -1}, nil - case offsetEnd > -1: - if offsetEnd == 0 { - return nil, errInvalidRange - } - return &HTTPRangeSpec{true, -offsetEnd, -1}, nil - default: - // rangeString contains first and last byte positions missing. eg. "bytes=-" - return nil, fmt.Errorf("'%s' does not have valid range value", rangeString) - } -} diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 2808309e0..066da89e1 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -40,9 +40,6 @@ type ObjectLayer interface { ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) // Object operations. - - GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) - GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) GetObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 8e24edbc2..972868a2b 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -20,12 +20,10 @@ import ( "context" "encoding/hex" "fmt" - "io" "math/rand" "path" "runtime" "strings" - "sync" "time" "unicode/utf8" @@ -304,56 +302,3 @@ type byBucketName []BucketInfo func (d byBucketName) Len() int { return len(d) } func (d byBucketName) Swap(i, j int) { d[i], d[j] = d[j], d[i] } func (d byBucketName) Less(i, j int) bool { return d[i].Name < d[j].Name } - -// GetObjectReader is a type that wraps a reader with a lock to -// provide a ReadCloser interface that unlocks on Close() -type GetObjectReader struct { - lock RWLocker - pr io.Reader - - // register any clean up actions (happens before unlocking) - cleanUp func() - - once sync.Once -} - -// NewGetObjectReader creates a new GetObjectReader. The cleanUp -// action is called on Close() before the lock is unlocked. -func NewGetObjectReader(reader io.Reader, lock RWLocker, cleanUp func()) io.ReadCloser { - return &GetObjectReader{ - lock: lock, - pr: reader, - cleanUp: cleanUp, - } -} - -// Close - calls the cleanup action if provided, and *then* unlocks -// the object. Calling Close multiple times is safe. -func (g *GetObjectReader) Close() error { - // sync.Once is used here to ensure that Close() is - // idempotent. - g.once.Do(func() { - // Unlocking is defer-red - this ensures that - // unlocking happens even if cleanUp panics. - defer func() { - if g.lock != nil { - g.lock.RUnlock() - } - }() - if g.cleanUp != nil { - g.cleanUp() - } - }) - return nil -} - -// Read - to implement Reader interface. -func (g *GetObjectReader) Read(p []byte) (n int, err error) { - n, err = g.pr.Read(p) - if err != nil { - // Calling code may not Close() in case of error, so - // we ensure it. - g.Close() - } - return -} diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index d61abf5e1..56497713b 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -73,6 +73,10 @@ func setHeadGetRespHeaders(w http.ResponseWriter, reqParams url.Values) { // also specify a data serialization format (JSON, CSV) of the object. func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "SelectObject") + var object, bucket string + vars := mux.Vars(r) + bucket = vars["bucket"] + object = vars["object"] // Fetch object stat info. objectAPI := api.ObjectAPI() @@ -81,43 +85,27 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r return } - if crypto.S3.IsRequested(r.Header) || crypto.S3KMS.IsRequested(r.Header) { // If SSE-S3 or SSE-KMS present -> AWS fails with undefined error - writeErrorResponse(w, ErrBadRequest, r.URL) - return + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo } - vars := mux.Vars(r) - bucket := vars["bucket"] - object := vars["object"] - - // Check for auth type to return S3 compatible error. - // type to return the correct error (NoSuchKey vs AccessDenied) if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone { if getRequestAuthType(r) == authTypeAnonymous { // As per "Permission" section in - // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html - // If the object you request does not exist, - // the error Amazon S3 returns depends on - // whether you also have the s3:ListBucket - // permission. - // * If you have the s3:ListBucket permission - // on the bucket, Amazon S3 will return an - // HTTP status code 404 ("no such key") - // error. - // * if you don’t have the s3:ListBucket - // permission, Amazon S3 will return an HTTP - // status code 403 ("access denied") error.` + // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html If + // the object you request does not exist, the error Amazon S3 returns + // depends on whether you also have the s3:ListBucket permission. * If you + // have the s3:ListBucket permission on the bucket, Amazon S3 will return + // an HTTP status code 404 ("no such key") error. * if you don’t have the + // s3:ListBucket permission, Amazon S3 will return an HTTP status code 403 + // ("access denied") error.` if globalPolicySys.IsAllowed(policy.Args{ Action: policy.ListBucketAction, BucketName: bucket, ConditionValues: getConditionValues(r, ""), IsOwner: false, }) { - getObjectInfo := objectAPI.GetObjectInfo - if api.CacheAPI() != nil { - getObjectInfo = api.CacheAPI().GetObjectInfo - } - _, err := getObjectInfo(ctx, bucket, object) if toAPIErrorCode(err) == ErrNoSuchKey { s3Error = ErrNoSuchKey @@ -127,57 +115,27 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r writeErrorResponse(w, s3Error, r.URL) return } - - // Get request range. - rangeHeader := r.Header.Get("Range") - if rangeHeader != "" { - writeErrorResponse(w, ErrUnsupportedRangeHeader, r.URL) - return - } - if r.ContentLength <= 0 { writeErrorResponse(w, ErrEmptyRequestBody, r.URL) return } - var selectReq ObjectSelectRequest if err := xmlDecoder(r.Body, &selectReq, r.ContentLength); err != nil { writeErrorResponse(w, ErrMalformedXML, r.URL) return } - if !strings.EqualFold(string(selectReq.ExpressionType), "SQL") { - writeErrorResponse(w, ErrInvalidExpressionType, r.URL) - return - } - if len(selectReq.Expression) >= s3select.MaxExpressionLength { - writeErrorResponse(w, ErrExpressionTooLong, r.URL) - return - } - if selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoUse && - selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoNone && - selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoIgnore && - selectReq.InputSerialization.CSV.FileHeaderInfo != "" { - writeErrorResponse(w, ErrInvalidFileHeaderInfo, r.URL) - return - } - if selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAlways && - selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAsNeeded && - selectReq.OutputSerialization.CSV.QuoteFields != "" { - writeErrorResponse(w, ErrInvalidQuoteFields, r.URL) - return - } - - getObjectNInfo := objectAPI.GetObjectNInfo - if api.CacheAPI() != nil { - getObjectNInfo = api.CacheAPI().GetObjectNInfo - } - - objInfo, reader, err := getObjectNInfo(ctx, bucket, object, nil) + objInfo, err := getObjectInfo(ctx, bucket, object) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } + // Get request range. + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + writeErrorResponse(w, ErrUnsupportedRangeHeader, r.URL) + return + } if selectReq.InputSerialization.CompressionType == SelectCompressionGZIP { if !strings.Contains(objInfo.ContentType, "gzip") { @@ -199,51 +157,63 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r return } } + if !strings.EqualFold(string(selectReq.ExpressionType), "SQL") { + writeErrorResponse(w, ErrInvalidExpressionType, r.URL) + return + } + if len(selectReq.Expression) >= s3select.MaxExpressionLength { + writeErrorResponse(w, ErrExpressionTooLong, r.URL) + return + } + if selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoUse && + selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoNone && + selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoIgnore && + selectReq.InputSerialization.CSV.FileHeaderInfo != "" { + writeErrorResponse(w, ErrInvalidFileHeaderInfo, r.URL) + return + } + if selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAlways && + selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAsNeeded && + selectReq.OutputSerialization.CSV.QuoteFields != "" { + writeErrorResponse(w, ErrInvalidQuoteFields, r.URL) + return + } - // If object is encrypted, we avoid the cache layer. - if crypto.IsEncrypted(objInfo.UserDefined) && api.CacheAPI() != nil { - // Close the existing reader before re-querying the backend - if reader != nil { - reader.Close() - } - // Query the backend - objInfo, reader, err = objectAPI.GetObjectNInfo(ctx, bucket, object, nil) - if err != nil { - writeErrorResponse(w, toAPIErrorCode(err), r.URL) - return - } + getObject := objectAPI.GetObject + if api.CacheAPI() != nil && !crypto.SSEC.IsRequested(r.Header) { + getObject = api.CacheAPI().GetObject } - defer reader.Close() - startOffset, length := int64(0), objInfo.Size + reader, pipewriter := io.Pipe() + // Get the object. + var startOffset int64 + length := objInfo.Size + + var writer io.Writer + writer = pipewriter if objectAPI.IsEncryptionSupported() { - s3Encrypted := crypto.IsEncrypted(objInfo.UserDefined) - if s3Encrypted { - var encReader io.Reader - encReader, startOffset, length, err = DecryptBlocksRequestR(reader, r, bucket, object, startOffset, length, objInfo, false) + if crypto.SSEC.IsRequested(r.Header) { + // Response writer should be limited early on for decryption upto required length, + // additionally also skipping mod(offset)64KiB boundaries. + writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length) + + writer, startOffset, length, err = DecryptBlocksRequest(writer, r, bucket, + object, startOffset, length, objInfo, false) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } - // Resulting reader should be limited early on - // for decryption upto required length, - // additionally also skipping mod(offset)64KiB - // boundaries. - encReader = io.LimitReader(ioutil.NewSkipReader(encReader, startOffset%(64*1024)), length) - cleanUp := func() { reader.Close() } - reader = NewGetObjectReader(encReader, nil, cleanUp) - if reader != nil { - defer reader.Close() - } - if s3Encrypted { - w.Header().Set(crypto.SSEHeader, crypto.SSEAlgorithmAES256) - } else { - w.Header().Set(crypto.SSECAlgorithm, r.Header.Get(crypto.SSECAlgorithm)) - w.Header().Set(crypto.SSECKeyMD5, r.Header.Get(crypto.SSECKeyMD5)) - } } } + go func() { + defer reader.Close() + if gerr := getObject(ctx, bucket, object, 0, objInfo.Size, writer, objInfo.ETag); gerr != nil { + pipewriter.CloseWithError(gerr) + return + } + pipewriter.Close() // Close writer explicitly signaling we wrote all data. + }() //s3select //Options if selectReq.OutputSerialization.CSV.FieldDelimiter == "" { @@ -302,34 +272,23 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req bucket := vars["bucket"] object := vars["object"] - // Check for auth type to return S3 compatible error. - // type to return the correct error (NoSuchKey vs AccessDenied) + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone { if getRequestAuthType(r) == authTypeAnonymous { - // As per "Permission" section in - // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html - // If the object you request does not exist, - // the error Amazon S3 returns depends on - // whether you also have the s3:ListBucket - // permission. - // * If you have the s3:ListBucket permission - // on the bucket, Amazon S3 will return an - // HTTP status code 404 ("no such key") - // error. - // * if you don’t have the s3:ListBucket - // permission, Amazon S3 will return an HTTP - // status code 403 ("access denied") error.` + // As per "Permission" section in https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html + // If the object you request does not exist, the error Amazon S3 returns depends on whether you also have the s3:ListBucket permission. + // * If you have the s3:ListBucket permission on the bucket, Amazon S3 will return an HTTP status code 404 ("no such key") error. + // * if you don’t have the s3:ListBucket permission, Amazon S3 will return an HTTP status code 403 ("access denied") error.` if globalPolicySys.IsAllowed(policy.Args{ Action: policy.ListBucketAction, BucketName: bucket, ConditionValues: getConditionValues(r, ""), IsOwner: false, }) { - getObjectInfo := objectAPI.GetObjectInfo - if api.CacheAPI() != nil { - getObjectInfo = api.CacheAPI().GetObjectInfo - } - _, err := getObjectInfo(ctx, bucket, object) if toAPIErrorCode(err) == ErrNoSuchKey { s3Error = ErrNoSuchKey @@ -340,20 +299,26 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req return } - getObjectNInfo := objectAPI.GetObjectNInfo - if api.CacheAPI() != nil { - getObjectNInfo = api.CacheAPI().GetObjectNInfo + objInfo, err := getObjectInfo(ctx, bucket, object) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + if objectAPI.IsEncryptionSupported() { + if _, err = DecryptObjectInfo(&objInfo, r.Header); err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } } // Get request range. - var rs *HTTPRangeSpec + var hrange *httpRange rangeHeader := r.Header.Get("Range") if rangeHeader != "" { - var err error - if rs, err = parseRequestRangeSpec(rangeHeader); err != nil { - // Handle only errInvalidRange. Ignore other - // parse error and treat it as regular Get - // request like Amazon S3. + if hrange, err = parseRequestRange(rangeHeader, objInfo.Size); err != nil { + // Handle only errInvalidRange + // Ignore other parse error and treat it as regular Get request like Amazon S3. if err == errInvalidRange { writeErrorResponse(w, ErrInvalidRange, r.URL) return @@ -364,64 +329,33 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req } } - objInfo, reader, err := getObjectNInfo(ctx, bucket, object, rs) - if err != nil { - writeErrorResponse(w, toAPIErrorCode(err), r.URL) - return - } - - // If object is encrypted, we avoid the cache layer. - if crypto.IsEncrypted(objInfo.UserDefined) && api.CacheAPI() != nil { - // Close the existing reader before re-querying the backend - if reader != nil { - reader.Close() - } - // Query the backend - objInfo, reader, err = objectAPI.GetObjectNInfo(ctx, bucket, object, rs) - if err != nil { - writeErrorResponse(w, toAPIErrorCode(err), r.URL) - return - } - } - defer reader.Close() - - if objectAPI.IsEncryptionSupported() { - if _, err = DecryptObjectInfo(&objInfo, r.Header); err != nil { - writeErrorResponse(w, toAPIErrorCode(err), r.URL) - return - } - } - // Validate pre-conditions if any. if checkPreconditions(w, r, objInfo) { return } - startOffset, length := int64(0), objInfo.Size - if rs != nil { - startOffset, length = rs.GetOffsetLength(objInfo.Size) + // Get the object. + var startOffset int64 + length := objInfo.Size + if hrange != nil { + startOffset = hrange.offsetBegin + length = hrange.getLength() } - // Get the object. + var writer io.Writer + writer = w if objectAPI.IsEncryptionSupported() { - s3Encrypted := crypto.IsEncrypted(objInfo.UserDefined) - if s3Encrypted { - var encReader io.Reader - encReader, startOffset, length, err = DecryptBlocksRequestR(reader, r, bucket, object, startOffset, length, objInfo, false) + s3Encrypted := crypto.S3.IsEncrypted(objInfo.UserDefined) + if crypto.SSEC.IsRequested(r.Header) || s3Encrypted { + // Response writer should be limited early on for decryption upto required length, + // additionally also skipping mod(offset)64KiB boundaries. + writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length) + + writer, startOffset, length, err = DecryptBlocksRequest(writer, r, bucket, object, startOffset, length, objInfo, false) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } - // Resulting reader should be limited early on - // for decryption upto required length, - // additionally also skipping mod(offset)64KiB - // boundaries. - encReader = io.LimitReader(ioutil.NewSkipReader(encReader, startOffset%(64*1024)), length) - cleanUp := func() { reader.Close() } - reader = NewGetObjectReader(encReader, nil, cleanUp) - if reader != nil { - defer reader.Close() - } if s3Encrypted { w.Header().Set(crypto.SSEHeader, crypto.SSEAlgorithmAES256) } else { @@ -431,19 +365,24 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req } } - setObjectHeaders(w, objInfo, rs) + setObjectHeaders(w, objInfo, hrange) setHeadGetRespHeaders(w, r.URL.Query()) + getObject := objectAPI.GetObject + if api.CacheAPI() != nil && !crypto.SSEC.IsRequested(r.Header) && !crypto.S3.IsEncrypted(objInfo.UserDefined) { + getObject = api.CacheAPI().GetObject + } + statusCodeWritten := false - httpWriter := ioutil.WriteOnClose(w) + httpWriter := ioutil.WriteOnClose(writer) - if rs != nil { + if hrange != nil && hrange.offsetBegin > -1 { statusCodeWritten = true w.WriteHeader(http.StatusPartialContent) } - // Write object content to response body - if _, err = io.Copy(httpWriter, reader); err != nil { + // Reads the object at startOffset and writes to mw. + if err = getObject(ctx, bucket, object, startOffset, length, httpWriter, objInfo.ETag); err != nil { if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet writeErrorResponse(w, toAPIErrorCode(err), r.URL) } diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 4cbb727cc..ee65adfff 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -550,11 +550,6 @@ func (s *xlSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, err err // --- Object Operations --- -// GetObjectNInfo -func (s *xlSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { - return s.getHashedSet(object).GetObjectNInfo(ctx, bucket, object, rs) -} - // GetObject - reads an object from the hashedSet based on the object name. func (s *xlSets) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error { return s.getHashedSet(object).GetObject(ctx, bucket, object, startOffset, length, writer, etag) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 36ff58f11..2c5b575d0 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -17,7 +17,6 @@ package cmd import ( - "bytes" "context" "encoding/hex" "io" @@ -163,54 +162,6 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc return objInfo, nil } -func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { - - // Acquire lock - lock := xl.nsMutex.NewNSLock(bucket, object) - if err = lock.GetRLock(globalObjectTimeout); err != nil { - return objInfo, nil, err - } - - if err = checkGetObjArgs(ctx, bucket, object); err != nil { - return objInfo, nil, err - } - - if hasSuffix(object, slashSeparator) { - if !xl.isObjectDir(bucket, object) { - return objInfo, nil, toObjectErr(errFileNotFound, bucket, object) - } - var e error - if objInfo, e = xl.getObjectInfoDir(ctx, bucket, object); e != nil { - return objInfo, nil, toObjectErr(e, bucket, object) - } - objReader := NewGetObjectReader(bytes.NewReader(nil), lock, nil) - return objInfo, objReader, nil - } - - objInfo, err = xl.getObjectInfo(ctx, bucket, object) - if err != nil { - return objInfo, nil, toObjectErr(err, bucket, object) - } - - startOffset, readLength := int64(0), objInfo.Size - if rs != nil { - startOffset, readLength = rs.GetOffsetLength(objInfo.Size) - } - - pr, pw := io.Pipe() - objReader := NewGetObjectReader(pr, lock, nil) - go func() { - err := xl.getObject(ctx, bucket, object, startOffset, readLength, pw, "") - if err != nil { - pw.CloseWithError(err) - return - } - pw.Close() - }() - - return objInfo, objReader, nil -} - // GetObject - reads an object erasured coded across multiple // disks. Supports additional parameters like offset and length // which are synonymous with HTTP Range requests. diff --git a/pkg/ioutil/ioutil.go b/pkg/ioutil/ioutil.go index bcedf84a6..91827eb4f 100644 --- a/pkg/ioutil/ioutil.go +++ b/pkg/ioutil/ioutil.go @@ -126,34 +126,3 @@ func (nopCloser) Close() error { return nil } func NopCloser(w io.Writer) io.WriteCloser { return nopCloser{w} } - -// SkipReader skips a given number of bytes and then returns all -// remaining data. -type SkipReader struct { - io.Reader - - skipCount int64 -} - -func (s *SkipReader) Read(p []byte) (int, error) { - l := int64(len(p)) - if l == 0 { - return 0, nil - } - for s.skipCount > 0 { - if l > s.skipCount { - l = s.skipCount - } - n, err := s.Reader.Read(p[:l]) - if err != nil { - return 0, err - } - s.skipCount -= int64(n) - } - return s.Reader.Read(p) -} - -// NewSkipReader - creates a SkipReader -func NewSkipReader(r io.Reader, n int64) io.Reader { - return &SkipReader{r, n} -}