[feat] Preserve version supplied by client (#9854)

Just like GET/DELETE APIs it is possible to preserve
client supplied versionId's, of course the versionIds
have to be uuid, if an existing versionId is found
it is overwritten if no object locking policies
are found.

- PUT /bucketname/objectname?versionId=<id>
- POST /bucketname/objectname?uploads=&versionId=<id>
- PUT /bucketname/objectname?verisonId=<id> (with x-amz-copy-source)
master
Harshavardhana 5 years ago committed by GitHub
parent 8aae8b1d27
commit e79874f58e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      cmd/encryption-v1.go
  2. 3
      cmd/erasure-multipart.go
  3. 3
      cmd/erasure-object.go
  4. 7
      cmd/erasure-sets.go
  5. 8
      cmd/erasure-zones.go
  6. 1
      cmd/object-handlers.go
  7. 14
      cmd/xl-storage-format-v2.go

@ -963,13 +963,32 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts
// get ObjectOptions for PUT calls from encryption headers and metadata // get ObjectOptions for PUT calls from encryption headers and metadata
func putOpts(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string) (opts ObjectOptions, err error) { func putOpts(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string) (opts ObjectOptions, err error) {
versioned := globalBucketVersioningSys.Enabled(bucket) versioned := globalBucketVersioningSys.Enabled(bucket)
vid := strings.TrimSpace(r.URL.Query().Get("versionId"))
if vid != "" && vid != nullVersionID {
_, err := uuid.Parse(vid)
if err != nil {
logger.LogIf(ctx, err)
return opts, VersionNotFound{
Bucket: bucket,
Object: object,
VersionID: vid,
}
}
}
// In the case of multipart custom format, the metadata needs to be checked in addition to header to see if it // In the case of multipart custom format, the metadata needs to be checked in addition to header to see if it
// is SSE-S3 encrypted, primarily because S3 protocol does not require SSE-S3 headers in PutObjectPart calls // is SSE-S3 encrypted, primarily because S3 protocol does not require SSE-S3 headers in PutObjectPart calls
if GlobalGatewaySSE.SSES3() && (crypto.S3.IsRequested(r.Header) || crypto.S3.IsEncrypted(metadata)) { if GlobalGatewaySSE.SSES3() && (crypto.S3.IsRequested(r.Header) || crypto.S3.IsEncrypted(metadata)) {
return ObjectOptions{ServerSideEncryption: encrypt.NewSSE(), UserDefined: metadata, Versioned: versioned}, nil return ObjectOptions{
ServerSideEncryption: encrypt.NewSSE(),
UserDefined: metadata,
VersionID: vid,
Versioned: versioned,
}, nil
} }
if GlobalGatewaySSE.SSEC() && crypto.SSEC.IsRequested(r.Header) { if GlobalGatewaySSE.SSEC() && crypto.SSEC.IsRequested(r.Header) {
opts, err = getOpts(ctx, r, bucket, object) opts, err = getOpts(ctx, r, bucket, object)
opts.VersionID = vid
opts.Versioned = versioned opts.Versioned = versioned
opts.UserDefined = metadata opts.UserDefined = metadata
return return
@ -983,13 +1002,19 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
if err != nil { if err != nil {
return ObjectOptions{}, err return ObjectOptions{}, err
} }
return ObjectOptions{ServerSideEncryption: sseKms, UserDefined: metadata, Versioned: versioned}, nil return ObjectOptions{
ServerSideEncryption: sseKms,
UserDefined: metadata,
VersionID: vid,
Versioned: versioned,
}, nil
} }
// default case of passing encryption headers and UserDefined metadata to backend // default case of passing encryption headers and UserDefined metadata to backend
opts, err = getDefaultOpts(r.Header, false, metadata) opts, err = getDefaultOpts(r.Header, false, metadata)
if err != nil { if err != nil {
return opts, err return opts, err
} }
opts.VersionID = vid
opts.Versioned = versioned opts.Versioned = versioned
return opts, nil return opts, nil
} }

@ -142,8 +142,11 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
// Calculate the version to be saved. // Calculate the version to be saved.
if opts.Versioned { if opts.Versioned {
fi.VersionID = opts.VersionID
if fi.VersionID == "" {
fi.VersionID = mustGetUUID() fi.VersionID = mustGetUUID()
} }
}
fi.DataDir = mustGetUUID() fi.DataDir = mustGetUUID()
fi.ModTime = UTCNow() fi.ModTime = UTCNow()

@ -608,8 +608,11 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
fi := newFileInfo(object, dataDrives, parityDrives) fi := newFileInfo(object, dataDrives, parityDrives)
if opts.Versioned { if opts.Versioned {
fi.VersionID = opts.VersionID
if fi.VersionID == "" {
fi.VersionID = mustGetUUID() fi.VersionID = mustGetUUID()
} }
}
fi.DataDir = mustGetUUID() fi.DataDir = mustGetUUID()
// Initialize erasure metadata. // Initialize erasure metadata.

@ -794,7 +794,12 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
return srcSet.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) return srcSet.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
} }
putOpts := ObjectOptions{ServerSideEncryption: dstOpts.ServerSideEncryption, UserDefined: srcInfo.UserDefined} putOpts := ObjectOptions{
ServerSideEncryption: dstOpts.ServerSideEncryption,
UserDefined: srcInfo.UserDefined,
Versioned: dstOpts.Versioned,
VersionID: dstOpts.VersionID,
}
return dstSet.putObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts) return dstSet.putObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
} }

@ -594,7 +594,13 @@ func (z *erasureZones) CopyObject(ctx context.Context, srcBucket, srcObject, dst
return objInfo, err return objInfo, err
} }
putOpts := ObjectOptions{ServerSideEncryption: dstOpts.ServerSideEncryption, UserDefined: srcInfo.UserDefined} putOpts := ObjectOptions{
ServerSideEncryption: dstOpts.ServerSideEncryption,
UserDefined: srcInfo.UserDefined,
Versioned: dstOpts.Versioned,
VersionID: dstOpts.VersionID,
}
if cpSrcDstSame && srcInfo.metadataOnly { if cpSrcDstSame && srcInfo.metadataOnly {
return z.zones[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) return z.zones[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
} }

@ -1735,6 +1735,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
if srcOpts.ServerSideEncryption != nil { if srcOpts.ServerSideEncryption != nil {
getOpts.ServerSideEncryption = encrypt.SSE(srcOpts.ServerSideEncryption) getOpts.ServerSideEncryption = encrypt.SSE(srcOpts.ServerSideEncryption)
} }
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, nil) dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, nil)
if err != nil { if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))

@ -237,14 +237,16 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
return nil return nil
} }
var uv uuid.UUID if fi.VersionID == "" {
var err error // this means versioning is not yet
// null version Id means empty version Id. // enabled i.e all versions are basically
if fi.VersionID == nullVersionID { // default value i.e "null"
fi.VersionID = "" fi.VersionID = nullVersionID
} }
if fi.VersionID != "" { var uv uuid.UUID
var err error
if fi.VersionID != "" && fi.VersionID != nullVersionID {
uv, err = uuid.Parse(fi.VersionID) uv, err = uuid.Parse(fi.VersionID)
if err != nil { if err != nil {
return err return err

Loading…
Cancel
Save