From 5b05df215a71e1de8b46e239d5a4efa49281c014 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 28 Aug 2018 13:08:30 -0700 Subject: [PATCH] Change SelectAPI to use new GetObjectNInfo API (#6373) This PR also removes some double checks --- cmd/object-handlers.go | 235 +++++++++++++++++++++++------------------ 1 file changed, 131 insertions(+), 104 deletions(-) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index d2b1c9ed3..38629c4bf 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -73,10 +73,6 @@ func setHeadGetRespHeaders(w http.ResponseWriter, reqParams url.Values) { // also specify a data serialization format (JSON, CSV) of the object. func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "SelectObject") - var object, bucket string - vars := mux.Vars(r) - bucket = vars["bucket"] - object = vars["object"] // Fetch object stat info. objectAPI := api.ObjectAPI() @@ -85,27 +81,43 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r return } - getObjectInfo := objectAPI.GetObjectInfo - if api.CacheAPI() != nil { - getObjectInfo = api.CacheAPI().GetObjectInfo + if crypto.S3.IsRequested(r.Header) || crypto.S3KMS.IsRequested(r.Header) { // If SSE-S3 or SSE-KMS present -> AWS fails with undefined error + writeErrorResponse(w, ErrBadRequest, r.URL) + return } + vars := mux.Vars(r) + bucket := vars["bucket"] + object := vars["object"] + + // Check for auth type to return S3 compatible error. + // type to return the correct error (NoSuchKey vs AccessDenied) if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone { if getRequestAuthType(r) == authTypeAnonymous { // As per "Permission" section in - // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html If - // the object you request does not exist, the error Amazon S3 returns - // depends on whether you also have the s3:ListBucket permission. * If you - // have the s3:ListBucket permission on the bucket, Amazon S3 will return - // an HTTP status code 404 ("no such key") error. * if you don’t have the - // s3:ListBucket permission, Amazon S3 will return an HTTP status code 403 - // ("access denied") error.` + // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html + // If the object you request does not exist, + // the error Amazon S3 returns depends on + // whether you also have the s3:ListBucket + // permission. + // * If you have the s3:ListBucket permission + // on the bucket, Amazon S3 will return an + // HTTP status code 404 ("no such key") + // error. + // * if you don’t have the s3:ListBucket + // permission, Amazon S3 will return an HTTP + // status code 403 ("access denied") error.` if globalPolicySys.IsAllowed(policy.Args{ Action: policy.ListBucketAction, BucketName: bucket, ConditionValues: getConditionValues(r, ""), IsOwner: false, }) { + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + _, err := getObjectInfo(ctx, bucket, object) if toAPIErrorCode(err) == ErrNoSuchKey { s3Error = ErrNoSuchKey @@ -115,25 +127,55 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r writeErrorResponse(w, s3Error, r.URL) return } + + // Get request range. + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + writeErrorResponse(w, ErrUnsupportedRangeHeader, r.URL) + return + } + if r.ContentLength <= 0 { writeErrorResponse(w, ErrEmptyRequestBody, r.URL) return } + var selectReq ObjectSelectRequest if err := xmlDecoder(r.Body, &selectReq, r.ContentLength); err != nil { writeErrorResponse(w, ErrMalformedXML, r.URL) return } - objInfo, err := getObjectInfo(ctx, bucket, object) - if err != nil { - writeErrorResponse(w, toAPIErrorCode(err), r.URL) + if !strings.EqualFold(string(selectReq.ExpressionType), "SQL") { + writeErrorResponse(w, ErrInvalidExpressionType, r.URL) return } - // Get request range. - rangeHeader := r.Header.Get("Range") - if rangeHeader != "" { - writeErrorResponse(w, ErrUnsupportedRangeHeader, r.URL) + if len(selectReq.Expression) >= s3select.MaxExpressionLength { + writeErrorResponse(w, ErrExpressionTooLong, r.URL) + return + } + if selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoUse && + selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoNone && + selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoIgnore && + selectReq.InputSerialization.CSV.FileHeaderInfo != "" { + writeErrorResponse(w, ErrInvalidFileHeaderInfo, r.URL) + return + } + if selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAlways && + selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAsNeeded && + selectReq.OutputSerialization.CSV.QuoteFields != "" { + writeErrorResponse(w, ErrInvalidQuoteFields, r.URL) + return + } + + getObjectNInfo := objectAPI.GetObjectNInfo + if api.CacheAPI() != nil { + getObjectNInfo = api.CacheAPI().GetObjectNInfo + } + + objInfo, reader, err := getObjectNInfo(ctx, bucket, object, nil) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } @@ -157,63 +199,48 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r return } } - if !strings.EqualFold(string(selectReq.ExpressionType), "SQL") { - writeErrorResponse(w, ErrInvalidExpressionType, r.URL) - return - } - if len(selectReq.Expression) >= s3select.MaxExpressionLength { - writeErrorResponse(w, ErrExpressionTooLong, r.URL) - return - } - if selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoUse && - selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoNone && - selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoIgnore && - selectReq.InputSerialization.CSV.FileHeaderInfo != "" { - writeErrorResponse(w, ErrInvalidFileHeaderInfo, r.URL) - return - } - if selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAlways && - selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAsNeeded && - selectReq.OutputSerialization.CSV.QuoteFields != "" { - writeErrorResponse(w, ErrInvalidQuoteFields, r.URL) - return - } - getObject := objectAPI.GetObject - if api.CacheAPI() != nil && !crypto.SSEC.IsRequested(r.Header) { - getObject = api.CacheAPI().GetObject + // If object is encrypted, we avoid the cache layer. + if crypto.IsEncrypted(objInfo.UserDefined) && api.CacheAPI() != nil { + // Close the existing reader before re-querying the backend + if reader != nil { + reader.Close() + } + // Query the backend + objInfo, reader, err = objectAPI.GetObjectNInfo(ctx, bucket, object, nil) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } } + defer reader.Close() - reader, pipewriter := io.Pipe() - + startOffset, length := int64(0), objInfo.Size // Get the object. - var startOffset int64 - length := objInfo.Size - - var writer io.Writer - writer = pipewriter if objectAPI.IsEncryptionSupported() { - if crypto.SSEC.IsRequested(r.Header) { - // Response writer should be limited early on for decryption upto required length, - // additionally also skipping mod(offset)64KiB boundaries. - writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length) - - writer, startOffset, length, err = DecryptBlocksRequest(writer, r, bucket, - object, startOffset, length, objInfo, false) + s3Encrypted := crypto.IsEncrypted(objInfo.UserDefined) + if s3Encrypted { + var encReader io.Reader + encReader, startOffset, length, err = DecryptBlocksRequestR(reader, r, bucket, object, startOffset, length, objInfo, false) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } + // Resulting reader should be limited early on + // for decryption upto required length, + // additionally also skipping mod(offset)64KiB + // boundaries. + encReader = io.LimitReader(ioutil.NewSkipReader(encReader, startOffset%(64*1024)), length) + cleanUp := func() { reader.Close() } + reader = NewGetObjectReader(encReader, nil, cleanUp) + if s3Encrypted { + w.Header().Set(crypto.SSEHeader, crypto.SSEAlgorithmAES256) + } else { + w.Header().Set(crypto.SSECAlgorithm, r.Header.Get(crypto.SSECAlgorithm)) + w.Header().Set(crypto.SSECKeyMD5, r.Header.Get(crypto.SSECKeyMD5)) + } } } - go func() { - defer reader.Close() - if gerr := getObject(ctx, bucket, object, 0, objInfo.Size, writer, objInfo.ETag); gerr != nil { - pipewriter.CloseWithError(gerr) - return - } - pipewriter.Close() // Close writer explicitly signaling we wrote all data. - }() //s3select //Options if selectReq.OutputSerialization.CSV.FieldDelimiter == "" { @@ -272,35 +299,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req bucket := vars["bucket"] object := vars["object"] - getObjectNInfo := objectAPI.GetObjectNInfo - if api.CacheAPI() != nil { - getObjectNInfo = api.CacheAPI().GetObjectNInfo - } - - // Get request range. - var rs *HTTPRangeSpec - rangeHeader := r.Header.Get("Range") - if rangeHeader != "" { - var err error - if rs, err = parseRequestRangeSpec(rangeHeader); err != nil { - // Handle only errInvalidRange. Ignore other - // parse error and treat it as regular Get - // request like Amazon S3. - if err == errInvalidRange { - writeErrorResponse(w, ErrInvalidRange, r.URL) - return - } - - // log the error. - logger.LogIf(ctx, err) - } - } - - objInfo, reader, err := getObjectNInfo(ctx, bucket, object, rs) - if reader != nil { - defer reader.Close() - } - // Before check err value above, we need to check the auth + // Check for auth type to return S3 compatible error. // type to return the correct error (NoSuchKey vs AccessDenied) if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone { if getRequestAuthType(r) == authTypeAnonymous { @@ -323,6 +322,12 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req ConditionValues: getConditionValues(r, ""), IsOwner: false, }) { + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + + _, err := getObjectInfo(ctx, bucket, object) if toAPIErrorCode(err) == ErrNoSuchKey { s3Error = ErrNoSuchKey } @@ -331,29 +336,51 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(w, s3Error, r.URL) return } + + getObjectNInfo := objectAPI.GetObjectNInfo + if api.CacheAPI() != nil { + getObjectNInfo = api.CacheAPI().GetObjectNInfo + } + + // Get request range. + var rs *HTTPRangeSpec + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + var err error + if rs, err = parseRequestRangeSpec(rangeHeader); err != nil { + // Handle only errInvalidRange. Ignore other + // parse error and treat it as regular Get + // request like Amazon S3. + if err == errInvalidRange { + writeErrorResponse(w, ErrInvalidRange, r.URL) + return + } + + // log the error. + logger.LogIf(ctx, err) + } + } + + objInfo, reader, err := getObjectNInfo(ctx, bucket, object, rs) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } // If object is encrypted, we avoid the cache layer. - isEncrypted := objectAPI.IsEncryptionSupported() && (crypto.SSEC.IsRequested(r.Header) || - crypto.S3.IsEncrypted(objInfo.UserDefined)) - if isEncrypted && api.CacheAPI() != nil { + if crypto.IsEncrypted(objInfo.UserDefined) && api.CacheAPI() != nil { // Close the existing reader before re-querying the backend if reader != nil { reader.Close() } // Query the backend objInfo, reader, err = objectAPI.GetObjectNInfo(ctx, bucket, object, rs) - if reader != nil { - defer reader.Close() - } if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } } + defer reader.Close() if objectAPI.IsEncryptionSupported() { if _, err = DecryptObjectInfo(&objInfo, r.Header); err != nil { @@ -374,8 +401,8 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req // Get the object. if objectAPI.IsEncryptionSupported() { - s3Encrypted := crypto.S3.IsEncrypted(objInfo.UserDefined) - if crypto.SSEC.IsRequested(r.Header) || s3Encrypted { + s3Encrypted := crypto.IsEncrypted(objInfo.UserDefined) + if s3Encrypted { var encReader io.Reader encReader, startOffset, length, err = DecryptBlocksRequestR(reader, r, bucket, object, startOffset, length, objInfo, false) if err != nil {