@ -17,7 +17,26 @@
package cmd
import (
"context"
"encoding/xml"
"fmt"
"io"
"net/http"
"runtime"
"strings"
"time"
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
sse "github.com/minio/minio/pkg/bucket/encryption"
"github.com/minio/minio/pkg/bucket/lifecycle"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/s3select"
)
const (
@ -46,3 +65,547 @@ func (sys *LifecycleSys) Get(bucketName string) (lc *lifecycle.Lifecycle, err er
func NewLifecycleSys ( ) * LifecycleSys {
return & LifecycleSys { }
}
type transitionState struct {
// add future metrics here
transitionCh chan ObjectInfo
}
func ( t * transitionState ) queueTransitionTask ( oi ObjectInfo ) {
select {
case t . transitionCh <- oi :
default :
}
}
var (
globalTransitionState * transitionState
globalTransitionConcurrent = runtime . GOMAXPROCS ( 0 ) / 2
)
func newTransitionState ( ) * transitionState {
// fix minimum concurrent transition to 1 for single CPU setup
if globalTransitionConcurrent == 0 {
globalTransitionConcurrent = 1
}
ts := & transitionState {
transitionCh : make ( chan ObjectInfo , 10000 ) ,
}
go func ( ) {
<- GlobalContext . Done ( )
close ( ts . transitionCh )
} ( )
return ts
}
// addWorker creates a new worker to process tasks
func ( t * transitionState ) addWorker ( ctx context . Context , objectAPI ObjectLayer ) {
// Add a new worker.
go func ( ) {
for {
select {
case <- ctx . Done ( ) :
return
case oi , ok := <- t . transitionCh :
if ! ok {
return
}
transitionObject ( ctx , objectAPI , oi )
}
}
} ( )
}
func initBackgroundTransition ( ctx context . Context , objectAPI ObjectLayer ) {
if globalTransitionState == nil {
return
}
// Start with globalTransitionConcurrent.
for i := 0 ; i < globalTransitionConcurrent ; i ++ {
globalTransitionState . addWorker ( ctx , objectAPI )
}
}
func validateLifecycleTransition ( ctx context . Context , bucket string , lfc * lifecycle . Lifecycle ) error {
for _ , rule := range lfc . Rules {
if rule . Transition . StorageClass != "" {
sameTarget , destbucket , err := validateTransitionDestination ( ctx , bucket , rule . Transition . StorageClass )
if err != nil {
return err
}
if sameTarget && destbucket == bucket {
return fmt . Errorf ( "Transition destination cannot be the same as the source bucket" )
}
}
}
return nil
}
// validateTransitionDestination returns error if transition destination bucket missing or not configured
// It also returns true if transition destination is same as this server.
func validateTransitionDestination ( ctx context . Context , bucket string , targetLabel string ) ( bool , string , error ) {
tgt := globalBucketTargetSys . GetRemoteTargetWithLabel ( ctx , bucket , targetLabel )
if tgt == nil {
return false , "" , BucketRemoteTargetNotFound { Bucket : bucket }
}
arn , err := madmin . ParseARN ( tgt . Arn )
if err != nil {
return false , "" , BucketRemoteTargetNotFound { Bucket : bucket }
}
clnt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , tgt . Arn )
if clnt == nil {
return false , "" , BucketRemoteTargetNotFound { Bucket : bucket }
}
if found , _ := clnt . BucketExists ( ctx , arn . Bucket ) ; ! found {
return false , "" , BucketRemoteDestinationNotFound { Bucket : arn . Bucket }
}
sameTarget , _ := isLocalHost ( clnt . EndpointURL ( ) . Hostname ( ) , clnt . EndpointURL ( ) . Port ( ) , globalMinioPort )
return sameTarget , arn . Bucket , nil
}
// return true if ARN representing transition storage class is present in a active rule
// for the lifecycle configured on this bucket
func transitionSCInUse ( ctx context . Context , lfc * lifecycle . Lifecycle , bucket , arnStr string ) bool {
tgtLabel := globalBucketTargetSys . GetRemoteLabelWithArn ( ctx , bucket , arnStr )
if tgtLabel == "" {
return false
}
for _ , rule := range lfc . Rules {
if rule . Status == Disabled {
continue
}
if rule . Transition . StorageClass != "" && rule . Transition . StorageClass == tgtLabel {
return true
}
}
return false
}
// set PutObjectOptions for PUT operation to transition data to target cluster
func putTransitionOpts ( objInfo ObjectInfo ) ( putOpts miniogo . PutObjectOptions ) {
meta := make ( map [ string ] string )
tag , err := tags . ParseObjectTags ( objInfo . UserTags )
if err != nil {
return
}
putOpts = miniogo . PutObjectOptions {
UserMetadata : meta ,
UserTags : tag . ToMap ( ) ,
ContentType : objInfo . ContentType ,
ContentEncoding : objInfo . ContentEncoding ,
StorageClass : objInfo . StorageClass ,
Internal : miniogo . AdvancedPutOptions {
SourceVersionID : objInfo . VersionID ,
SourceMTime : objInfo . ModTime ,
SourceETag : objInfo . ETag ,
} ,
}
if mode , ok := objInfo . UserDefined [ xhttp . AmzObjectLockMode ] ; ok {
rmode := miniogo . RetentionMode ( mode )
putOpts . Mode = rmode
}
if retainDateStr , ok := objInfo . UserDefined [ xhttp . AmzObjectLockRetainUntilDate ] ; ok {
rdate , err := time . Parse ( time . RFC3339 , retainDateStr )
if err != nil {
return
}
putOpts . RetainUntilDate = rdate
}
if lhold , ok := objInfo . UserDefined [ xhttp . AmzObjectLockLegalHold ] ; ok {
putOpts . LegalHold = miniogo . LegalHoldStatus ( lhold )
}
return
}
// handle deletes of transitioned objects or object versions when one of the following is true:
// 1. temporarily restored copies of objects (restored with the PostRestoreObject API) expired.
// 2. life cycle expiry date is met on the object.
func deleteTransitionedObject ( ctx context . Context , objectAPI ObjectLayer , bucket , object string , objInfo ObjectInfo , lcOpts lifecycle . ObjectOpts , action lifecycle . Action ) error {
if objInfo . TransitionStatus == "" {
return nil
}
lc , err := globalLifecycleSys . Get ( bucket )
if err != nil {
return err
}
arn := getLifecycleTransitionTargetArn ( ctx , lc , bucket , lcOpts )
if arn == nil {
return fmt . Errorf ( "remote target not configured" )
}
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , arn . String ( ) )
if tgt == nil {
return fmt . Errorf ( "remote target not configured" )
}
var opts ObjectOptions
opts . Versioned = globalBucketVersioningSys . Enabled ( bucket )
opts . VersionID = objInfo . VersionID
switch action {
case lifecycle . DeleteRestoredAction , lifecycle . DeleteRestoredVersionAction :
// delete locally restored copy of object or object version
// from the source, while leaving metadata behind. The data on
// transitioned tier lies untouched and still accessible
opts . TransitionStatus = objInfo . TransitionStatus
_ , err = objectAPI . DeleteObject ( ctx , bucket , object , opts )
return err
case lifecycle . DeleteAction , lifecycle . DeleteVersionAction :
// When an object is past expiry, delete the data from transitioned tier and
// metadata from source
if err := tgt . RemoveObject ( ctx , arn . Bucket , object , miniogo . RemoveObjectOptions { VersionID : objInfo . VersionID } ) ; err != nil {
logger . LogIf ( ctx , err )
}
_ , err = objectAPI . DeleteObject ( ctx , bucket , object , opts )
if err != nil {
return err
}
eventName := event . ObjectRemovedDelete
if objInfo . DeleteMarker {
eventName = event . ObjectRemovedDeleteMarkerCreated
}
// Notify object deleted event.
sendEvent ( eventArgs {
EventName : eventName ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [ILM-EXPIRY]" ,
} )
}
// should never reach here
return nil
}
// transition object to target specified by the transition ARN. When an object is transitioned to another
// storage specified by the transition ARN, the metadata is left behind on source cluster and original content
// is moved to the transition tier. Note that in the case of encrypted objects, entire encrypted stream is moved
// to the transition tier without decrypting or re-encrypting.
func transitionObject ( ctx context . Context , objectAPI ObjectLayer , objInfo ObjectInfo ) error {
lc , err := globalLifecycleSys . Get ( objInfo . Bucket )
if err != nil {
return err
}
lcOpts := lifecycle . ObjectOpts {
Name : objInfo . Name ,
UserTags : objInfo . UserTags ,
}
arn := getLifecycleTransitionTargetArn ( ctx , lc , objInfo . Bucket , lcOpts )
if arn == nil {
return fmt . Errorf ( "remote target not configured" )
}
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , arn . String ( ) )
if tgt == nil {
return fmt . Errorf ( "remote target not configured" )
}
gr , err := objectAPI . GetObjectNInfo ( ctx , objInfo . Bucket , objInfo . Name , nil , http . Header { } , readLock , ObjectOptions {
VersionID : objInfo . VersionID ,
TransitionStatus : lifecycle . TransitionPending ,
} )
if err != nil {
return err
}
oi := gr . ObjInfo
if oi . TransitionStatus == lifecycle . TransitionComplete {
return nil
}
putOpts := putTransitionOpts ( oi )
if _ , err = tgt . PutObject ( ctx , arn . Bucket , oi . Name , gr , oi . Size , "" , "" , putOpts ) ; err != nil {
return err
}
gr . Close ( )
var opts ObjectOptions
opts . Versioned = globalBucketVersioningSys . Enabled ( oi . Bucket )
opts . VersionID = oi . VersionID
opts . TransitionStatus = lifecycle . TransitionComplete
if _ , err = objectAPI . DeleteObject ( ctx , oi . Bucket , oi . Name , opts ) ; err != nil {
return err
}
return nil
}
// getLifecycleTransitionTargetArn returns transition ARN for storage class specified in the config.
func getLifecycleTransitionTargetArn ( ctx context . Context , lc * lifecycle . Lifecycle , bucket string , obj lifecycle . ObjectOpts ) * madmin . ARN {
for _ , rule := range lc . FilterActionableRules ( obj ) {
if rule . Transition . StorageClass != "" {
return globalBucketTargetSys . GetRemoteArnWithLabel ( ctx , bucket , rule . Transition . StorageClass )
}
}
return nil
}
// getTransitionedObjectReader returns a reader from the transitioned tier.
func getTransitionedObjectReader ( ctx context . Context , bucket , object string , rs * HTTPRangeSpec , h http . Header , oi ObjectInfo , opts ObjectOptions ) ( gr * GetObjectReader , err error ) {
var lc * lifecycle . Lifecycle
lc , err = globalLifecycleSys . Get ( bucket )
if err != nil {
return nil , err
}
arn := getLifecycleTransitionTargetArn ( ctx , lc , bucket , lifecycle . ObjectOpts {
Name : object ,
UserTags : oi . UserTags ,
ModTime : oi . ModTime ,
VersionID : oi . VersionID ,
DeleteMarker : oi . DeleteMarker ,
IsLatest : oi . IsLatest ,
} )
if arn == nil {
return nil , fmt . Errorf ( "remote target not configured" )
}
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , arn . String ( ) )
if tgt == nil {
return nil , fmt . Errorf ( "remote target not configured" )
}
fn , off , length , err := NewGetObjectReader ( rs , oi , opts )
if err != nil {
return nil , ErrorRespToObjectError ( err , bucket , object )
}
gopts := miniogo . GetObjectOptions { VersionID : opts . VersionID }
// get correct offsets for encrypted object
if off >= 0 && length >= 0 {
if err := gopts . SetRange ( off , off + length - 1 ) ; err != nil {
return nil , ErrorRespToObjectError ( err , bucket , object )
}
}
reader , _ , _ , err := tgt . GetObject ( ctx , arn . Bucket , object , gopts )
if err != nil {
return nil , err
}
closeReader := func ( ) { reader . Close ( ) }
return fn ( reader , h , opts . CheckPrecondFn , closeReader )
}
// RestoreRequestType represents type of restore.
type RestoreRequestType string
const (
// SelectRestoreRequest specifies select request. This is the only valid value
SelectRestoreRequest RestoreRequestType = "SELECT"
)
// Encryption specifies encryption setting on restored bucket
type Encryption struct {
EncryptionType sse . SSEAlgorithm ` xml:"EncryptionType" `
KMSContext string ` xml:"KMSContext,omitempty" `
KMSKeyID string ` xml:"KMSKeyId,omitempty" `
}
// MetadataEntry denotes name and value.
type MetadataEntry struct {
Name string ` xml:"Name" `
Value string ` xml:"Value" `
}
// S3Location specifies s3 location that receives result of a restore object request
type S3Location struct {
BucketName string ` xml:"BucketName,omitempty" `
Encryption Encryption ` xml:"Encryption,omitempty" `
Prefix string ` xml:"Prefix,omitempty" `
StorageClass string ` xml:"StorageClass,omitempty" `
Tagging * tags . Tags ` xml:"Tagging,omitempty" `
UserMetadata [ ] MetadataEntry ` xml:"UserMetadata" `
}
// OutputLocation specifies bucket where object needs to be restored
type OutputLocation struct {
S3 S3Location ` xml:"S3,omitempty" `
}
// IsEmpty returns true if output location not specified.
func ( o * OutputLocation ) IsEmpty ( ) bool {
return o . S3 . BucketName == ""
}
// SelectParameters specifies sql select parameters
type SelectParameters struct {
s3select . S3Select
}
// IsEmpty returns true if no select parameters set
func ( sp * SelectParameters ) IsEmpty ( ) bool {
return sp == nil || sp . S3Select == s3select . S3Select { }
}
var (
selectParamsXMLName = "SelectParameters"
)
// UnmarshalXML - decodes XML data.
func ( sp * SelectParameters ) UnmarshalXML ( d * xml . Decoder , start xml . StartElement ) error {
// Essentially the same as S3Select barring the xml name.
if start . Name . Local == selectParamsXMLName {
start . Name = xml . Name { Space : "" , Local : "SelectRequest" }
}
return sp . S3Select . UnmarshalXML ( d , start )
}
// RestoreObjectRequest - xml to restore a transitioned object
type RestoreObjectRequest struct {
XMLName xml . Name ` xml:"http://s3.amazonaws.com/doc/2006-03-01/ RestoreRequest" json:"-" `
Days int ` xml:"Days,omitempty" `
Type RestoreRequestType ` xml:"Type,omitempty" `
Tier string ` xml:"Tier,-" `
Description string ` xml:"Description,omitempty" `
SelectParameters * SelectParameters ` xml:"SelectParameters,omitempty" `
OutputLocation OutputLocation ` xml:"OutputLocation,omitempty" `
}
// Maximum 2MiB size per restore object request.
const maxRestoreObjectRequestSize = 2 << 20
// parseRestoreRequest parses RestoreObjectRequest from xml
func parseRestoreRequest ( reader io . Reader ) ( * RestoreObjectRequest , error ) {
req := RestoreObjectRequest { }
if err := xml . NewDecoder ( io . LimitReader ( reader , maxRestoreObjectRequestSize ) ) . Decode ( & req ) ; err != nil {
return nil , err
}
return & req , nil
}
// validate a RestoreObjectRequest as per AWS S3 spec https://docs.aws.amazon.com/AmazonS3/latest/API/API_RestoreObject.html
func ( r * RestoreObjectRequest ) validate ( ctx context . Context , objAPI ObjectLayer ) error {
if r . Type != SelectRestoreRequest && ! r . SelectParameters . IsEmpty ( ) {
return fmt . Errorf ( "Select parameters can only be specified with SELECT request type" )
}
if r . Type == SelectRestoreRequest && r . SelectParameters . IsEmpty ( ) {
return fmt . Errorf ( "SELECT restore request requires select parameters to be specified" )
}
if r . Type != SelectRestoreRequest && ! r . OutputLocation . IsEmpty ( ) {
return fmt . Errorf ( "OutputLocation required only for SELECT request type" )
}
if r . Type == SelectRestoreRequest && r . OutputLocation . IsEmpty ( ) {
return fmt . Errorf ( "OutputLocation required for SELECT requests" )
}
if r . Days != 0 && r . Type == SelectRestoreRequest {
return fmt . Errorf ( "Days cannot be specified with SELECT restore request" )
}
if r . Days == 0 && r . Type != SelectRestoreRequest {
return fmt . Errorf ( "restoration days should be at least 1" )
}
// Check if bucket exists.
if ! r . OutputLocation . IsEmpty ( ) {
if _ , err := objAPI . GetBucketInfo ( ctx , r . OutputLocation . S3 . BucketName ) ; err != nil {
return err
}
if r . OutputLocation . S3 . Prefix == "" {
return fmt . Errorf ( "Prefix is a required parameter in OutputLocation" )
}
if r . OutputLocation . S3 . Encryption . EncryptionType != crypto . SSEAlgorithmAES256 {
return NotImplemented { }
}
}
return nil
}
// set ObjectOptions for PUT call to restore temporary copy of transitioned data
func putRestoreOpts ( bucket , object string , rreq * RestoreObjectRequest , objInfo ObjectInfo ) ( putOpts ObjectOptions ) {
meta := make ( map [ string ] string )
sc := rreq . OutputLocation . S3 . StorageClass
if sc == "" {
sc = objInfo . StorageClass
}
meta [ strings . ToLower ( xhttp . AmzStorageClass ) ] = sc
if rreq . Type == SelectRestoreRequest {
for _ , v := range rreq . OutputLocation . S3 . UserMetadata {
if ! strings . HasPrefix ( "x-amz-meta" , strings . ToLower ( v . Name ) ) {
meta [ "x-amz-meta-" + v . Name ] = v . Value
continue
}
meta [ v . Name ] = v . Value
}
meta [ xhttp . AmzObjectTagging ] = rreq . OutputLocation . S3 . Tagging . String ( )
if rreq . OutputLocation . S3 . Encryption . EncryptionType != "" {
meta [ crypto . SSEHeader ] = crypto . SSEAlgorithmAES256
}
return ObjectOptions {
Versioned : globalBucketVersioningSys . Enabled ( bucket ) ,
VersionSuspended : globalBucketVersioningSys . Suspended ( bucket ) ,
UserDefined : meta ,
}
}
for k , v := range objInfo . UserDefined {
meta [ k ] = v
}
meta [ xhttp . AmzObjectTagging ] = objInfo . UserTags
return ObjectOptions {
Versioned : globalBucketVersioningSys . Enabled ( bucket ) ,
VersionSuspended : globalBucketVersioningSys . Suspended ( bucket ) ,
UserDefined : meta ,
VersionID : objInfo . VersionID ,
MTime : objInfo . ModTime ,
Expires : objInfo . Expires ,
}
}
var (
errRestoreHDRMissing = fmt . Errorf ( "x-amz-restore header not found" )
errRestoreHDRMalformed = fmt . Errorf ( "x-amz-restore header malformed" )
)
// parse x-amz-restore header from user metadata to get the status of ongoing request and expiry of restoration
// if any. This header value is of format: ongoing-request=true|false, expires=time
func parseRestoreHeaderFromMeta ( meta map [ string ] string ) ( ongoing bool , expiry time . Time , err error ) {
restoreHdr , ok := meta [ xhttp . AmzRestore ]
if ! ok {
return ongoing , expiry , errRestoreHDRMissing
}
rslc := strings . SplitN ( restoreHdr , "," , 2 )
if len ( rslc ) != 2 {
return ongoing , expiry , errRestoreHDRMalformed
}
rstatusSlc := strings . SplitN ( rslc [ 0 ] , "=" , 2 )
if len ( rstatusSlc ) != 2 {
return ongoing , expiry , errRestoreHDRMalformed
}
rExpSlc := strings . SplitN ( rslc [ 1 ] , "=" , 2 )
if len ( rExpSlc ) != 2 {
return ongoing , expiry , errRestoreHDRMalformed
}
expiry , err = time . Parse ( http . TimeFormat , rExpSlc [ 1 ] )
if err != nil {
return
}
return rstatusSlc [ 1 ] == "true" , expiry , nil
}
// restoreTransitionedObject is similar to PostObjectRestore from AWS GLACIER
// storage class. When PostObjectRestore API is called, a temporary copy of the object
// is restored locally to the bucket on source cluster until the restore expiry date.
// The copy that was transitioned continues to reside in the transitioned tier.
func restoreTransitionedObject ( ctx context . Context , bucket , object string , objAPI ObjectLayer , objInfo ObjectInfo , rreq * RestoreObjectRequest , restoreExpiry time . Time ) error {
var rs * HTTPRangeSpec
gr , err := getTransitionedObjectReader ( ctx , bucket , object , rs , http . Header { } , objInfo , ObjectOptions {
VersionID : objInfo . VersionID } )
if err != nil {
return err
}
defer gr . Close ( )
hashReader , err := hash . NewReader ( gr , objInfo . Size , "" , "" , objInfo . Size , globalCLIContext . StrictS3Compat )
if err != nil {
return err
}
pReader := NewPutObjReader ( hashReader , nil , nil )
opts := putRestoreOpts ( bucket , object , rreq , objInfo )
opts . UserDefined [ xhttp . AmzRestore ] = fmt . Sprintf ( "ongoing-request=%t, expiry-date=%s" , false , restoreExpiry . Format ( http . TimeFormat ) )
if _ , err := objAPI . PutObject ( ctx , bucket , object , pReader , opts ) ; err != nil {
return err
}
return nil
}