Copy metadata before spawning goroutine + prealloc maps (#10458)

In `(*cacheObjects).GetObjectNInfo` copy the metadata before spawning a goroutine.

Clean up a few map[string]string copies as well, reducing allocs and simplifying the code.

Fixes #10426
master
Klaus Post 4 years ago committed by GitHub
parent ce6cef6855
commit b7438fe4e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      cmd/crypto/metadata.go
  2. 10
      cmd/disk-cache-backend.go
  3. 12
      cmd/disk-cache.go
  4. 12
      cmd/encryption-v1.go
  5. 15
      cmd/erasure-multipart.go
  6. 2
      cmd/fs-v1-multipart.go
  7. 10
      cmd/fs-v1.go
  8. 4
      cmd/gateway-common.go
  9. 2
      cmd/gateway/azure/gateway-azure.go
  10. 5
      cmd/logger/logger.go
  11. 14
      cmd/logger/message/audit/entry.go
  12. 2
      cmd/object-api-options.go
  13. 2
      cmd/object-api-utils.go
  14. 2
      cmd/signature-v2.go
  15. 10
      cmd/utils.go
  16. 2
      cmd/xl-storage-format-v2.go

@ -129,7 +129,7 @@ func (ssec) IsEncrypted(metadata map[string]string) bool {
// metadata is nil.
func CreateMultipartMetadata(metadata map[string]string) map[string]string {
if metadata == nil {
metadata = map[string]string{}
return map[string]string{SSEMultipart: ""}
}
metadata[SSEMultipart] = ""
return metadata
@ -156,7 +156,7 @@ func (s3) CreateMetadata(metadata map[string]string, keyID string, kmsKey []byte
}
if metadata == nil {
metadata = map[string]string{}
metadata = make(map[string]string, 5)
}
metadata[SSESealAlgorithm] = sealedKey.Algorithm
@ -236,7 +236,7 @@ func (ssec) CreateMetadata(metadata map[string]string, sealedKey SealedKey) map[
}
if metadata == nil {
metadata = map[string]string{}
metadata = make(map[string]string, 3)
}
metadata[SSESealAlgorithm] = SealAlgorithm
metadata[SSEIV] = base64.StdEncoding.EncodeToString(sealedKey.IV[:])

@ -696,10 +696,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
if err := os.MkdirAll(cachePath, 0777); err != nil {
return err
}
var metadata = make(map[string]string)
for k, v := range opts.UserDefined {
metadata[k] = v
}
var metadata = cloneMSS(opts.UserDefined)
var reader = data
var actualSize = uint64(size)
if globalCacheKMS != nil {
@ -739,10 +736,7 @@ func (c *diskCache) putRange(ctx context.Context, bucket, object string, data io
if err := os.MkdirAll(cachePath, 0777); err != nil {
return err
}
var metadata = make(map[string]string)
for k, v := range opts.UserDefined {
metadata[k] = v
}
var metadata = cloneMSS(opts.UserDefined)
var reader = data
var actualSize = uint64(rlen)
// objSize is the actual size of object (with encryption overhead if any)

@ -85,16 +85,15 @@ type cacheObjects struct {
}
func (c *cacheObjects) incHitsToMeta(ctx context.Context, dcache *diskCache, bucket, object string, size int64, eTag string, rs *HTTPRangeSpec) error {
metadata := make(map[string]string)
metadata["etag"] = eTag
metadata := map[string]string{"etag": eTag}
return dcache.SaveMetadata(ctx, bucket, object, metadata, size, rs, "", true)
}
// Backend metadata could have changed through server side copy - reset cache metadata if that is the case
func (c *cacheObjects) updateMetadataIfChanged(ctx context.Context, dcache *diskCache, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo, rs *HTTPRangeSpec) error {
bkMeta := make(map[string]string)
cacheMeta := make(map[string]string)
bkMeta := make(map[string]string, len(bkObjectInfo.UserDefined))
cacheMeta := make(map[string]string, len(cacheObjInfo.UserDefined))
for k, v := range bkObjectInfo.UserDefined {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
// Do not need to send any internal metadata
@ -166,7 +165,7 @@ func (c *cacheObjects) DeleteObjects(ctx context.Context, bucket string, objects
// construct a metadata k-v map
func getMetadata(objInfo ObjectInfo) map[string]string {
metadata := make(map[string]string)
metadata := make(map[string]string, len(objInfo.UserDefined)+4)
metadata["etag"] = objInfo.ETag
metadata["content-type"] = objInfo.ContentType
if objInfo.ContentEncoding != "" {
@ -334,11 +333,12 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
teeReader := io.TeeReader(bkReader, pipeWriter)
userDefined := getMetadata(bkReader.ObjInfo)
go func() {
putErr := dcache.Put(ctx, bucket, object,
io.LimitReader(pipeReader, bkReader.ObjInfo.Size),
bkReader.ObjInfo.Size, nil, ObjectOptions{
UserDefined: getMetadata(bkReader.ObjInfo),
UserDefined: userDefined,
}, false)
// close the write end of the pipe, so the error gets
// propagated to getObjReader

@ -388,12 +388,7 @@ func DecryptBlocksRequestR(inputReader io.Reader, h http.Header, offset,
object: object,
customerKeyHeader: h.Get(crypto.SSECKey),
copySource: copySource,
}
w.metadata = map[string]string{}
// Copy encryption metadata for internal use.
for k, v := range oi.UserDefined {
w.metadata[k] = v
metadata: cloneMSS(oi.UserDefined),
}
if w.copySource {
@ -432,10 +427,7 @@ type DecryptBlocksReader struct {
}
func (d *DecryptBlocksReader) buildDecrypter(partID int) error {
m := make(map[string]string)
for k, v := range d.metadata {
m[k] = v
}
m := cloneMSS(d.metadata)
// Initialize the first decrypter; new decrypters will be
// initialized in Read() operation as needed.
var key []byte

@ -280,10 +280,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
fi.DataDir = mustGetUUID()
fi.ModTime = UTCNow()
fi.Metadata = map[string]string{}
for k, v := range opts.UserDefined {
fi.Metadata[k] = v
}
fi.Metadata = cloneMSS(opts.UserDefined)
uploadID := mustGetUUID()
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
@ -555,10 +552,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
return result, err
}
result.UserDefined = map[string]string{}
for k, v := range fi.Metadata {
result.UserDefined[k] = v
}
result.UserDefined = cloneMSS(fi.Metadata)
return result, nil
}
@ -606,10 +600,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
result.UploadID = uploadID
result.MaxParts = maxParts
result.PartNumberMarker = partNumberMarker
result.UserDefined = map[string]string{}
for k, v := range fi.Metadata {
result.UserDefined[k] = v
}
result.UserDefined = cloneMSS(fi.Metadata)
// For empty number of parts or maxParts as zero, return right here.
if len(fi.Parts) == 0 || maxParts == 0 {

@ -731,7 +731,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
return oi, toObjectErr(err, bucket, object)
}
// Save additional metadata.
if len(fsMeta.Meta) == 0 {
if fsMeta.Meta == nil {
fsMeta.Meta = make(map[string]string)
}
fsMeta.Meta["etag"] = s3MD5

@ -635,10 +635,7 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
fsMeta = fs.defaultFsJSON(srcObject)
}
fsMeta.Meta = map[string]string{}
for k, v := range srcInfo.UserDefined {
fsMeta.Meta[k] = v
}
fsMeta.Meta = cloneMSS(srcInfo.UserDefined)
fsMeta.Meta["etag"] = srcInfo.ETag
if _, err = fsMeta.WriteTo(wlk); err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
@ -1124,10 +1121,7 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
data := r.Reader
// No metadata is set, allocate a new one.
meta := make(map[string]string)
for k, v := range opts.UserDefined {
meta[k] = v
}
meta := cloneMSS(opts.UserDefined)
var err error
// Validate if bucket name is valid and exists.

@ -57,7 +57,7 @@ var (
// FromMinioClientMetadata converts minio metadata to map[string]string
func FromMinioClientMetadata(metadata map[string][]string) map[string]string {
mm := map[string]string{}
mm := make(map[string]string, len(metadata))
for k, v := range metadata {
mm[http.CanonicalHeaderKey(k)] = v[0]
}
@ -227,7 +227,7 @@ func ToMinioClientObjectInfoMetadata(metadata map[string]string) map[string][]st
// ToMinioClientMetadata converts metadata to map[string]string
func ToMinioClientMetadata(metadata map[string]string) map[string]string {
mm := make(map[string]string)
mm := make(map[string]string, len(metadata))
for k, v := range metadata {
mm[http.CanonicalHeaderKey(k)] = v
}

@ -882,7 +882,7 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *
data := r.Reader
if data.Size() > azureBlockSize/2 {
if len(opts.UserDefined) == 0 {
if opts.UserDefined == nil {
opts.UserDefined = map[string]string{}
}

@ -331,8 +331,9 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) {
API = req.API
}
tags := make(map[string]string)
for _, entry := range req.GetTags() {
kv := req.GetTags()
tags := make(map[string]string, len(kv))
for _, entry := range kv {
tags[entry.Key] = entry.Val
}

@ -53,16 +53,18 @@ type Entry struct {
// ToEntry - constructs an audit entry object.
func ToEntry(w http.ResponseWriter, r *http.Request, reqClaims map[string]interface{}, deploymentID string) Entry {
reqQuery := make(map[string]string)
for k, v := range r.URL.Query() {
q := r.URL.Query()
reqQuery := make(map[string]string, len(q))
for k, v := range q {
reqQuery[k] = strings.Join(v, ",")
}
reqHeader := make(map[string]string)
reqHeader := make(map[string]string, len(r.Header))
for k, v := range r.Header {
reqHeader[k] = strings.Join(v, ",")
}
respHeader := make(map[string]string)
for k, v := range w.Header() {
wh := w.Header()
respHeader := make(map[string]string, len(wh))
for k, v := range wh {
respHeader[k] = strings.Join(v, ",")
}
respHeader[xhttp.ETag] = strings.Trim(respHeader[xhttp.ETag], `"`)
@ -71,7 +73,7 @@ func ToEntry(w http.ResponseWriter, r *http.Request, reqClaims map[string]interf
Version: Version,
DeploymentID: deploymentID,
RemoteHost: handlers.GetSourceIP(r),
RequestID: w.Header().Get(xhttp.AmzRequestID),
RequestID: wh.Get(xhttp.AmzRequestID),
UserAgent: r.UserAgent(),
Time: time.Now().UTC().Format(time.RFC3339Nano),
ReqQuery: reqQuery,

@ -168,7 +168,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
etag := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceETag))
if etag != "" {
if metadata == nil {
metadata = make(map[string]string)
metadata = make(map[string]string, 1)
}
metadata["etag"] = etag
}

@ -270,7 +270,7 @@ func removeStandardStorageClass(metadata map[string]string) map[string]string {
// cleanMetadataKeys takes keyNames to be filtered
// and returns a new map with all the entries with keyNames removed.
func cleanMetadataKeys(metadata map[string]string, keyNames ...string) map[string]string {
var newMeta = make(map[string]string)
var newMeta = make(map[string]string, len(metadata))
for k, v := range metadata {
if contains(keyNames, k) {
continue

@ -321,7 +321,7 @@ func compareSignatureV2(sig1, sig2 string) bool {
// Return canonical headers.
func canonicalizedAmzHeadersV2(headers http.Header) string {
var keys []string
keyval := make(map[string]string)
keyval := make(map[string]string, len(headers))
for key := range headers {
lkey := strings.ToLower(key)
if !strings.HasPrefix(lkey, "x-amz-") {

@ -112,6 +112,16 @@ func getWriteQuorum(drive int) int {
return quorum
}
// cloneMSS will clone a map[string]string.
// If input is nil an empty map is returned, not nil.
func cloneMSS(v map[string]string) map[string]string {
r := make(map[string]string, len(v))
for k, v := range v {
r[k] = v
}
return r
}
// URI scheme constants.
const (
httpScheme = "http"

@ -268,7 +268,7 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
PartSizes: make([]int64, len(fi.Parts)),
PartActualSizes: make([]int64, len(fi.Parts)),
MetaSys: make(map[string][]byte),
MetaUser: make(map[string]string),
MetaUser: make(map[string]string, len(fi.Metadata)),
},
}

Loading…
Cancel
Save