|
|
|
@ -1,5 +1,5 @@ |
|
|
|
|
/* |
|
|
|
|
* MinIO Cloud Storage, (C) 2016, 2017, 2018 MinIO, Inc. |
|
|
|
|
* MinIO Cloud Storage, (C) 2016-2020 MinIO, Inc. |
|
|
|
|
* |
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -142,20 +142,19 @@ func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, r |
|
|
|
|
return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var objInfo ObjectInfo |
|
|
|
|
objInfo, err = xl.getObjectInfo(ctx, bucket, object, opts) |
|
|
|
|
meta, metaArr, onlineDisks, err := xl.getObjectXLMeta(ctx, bucket, object, opts) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, toObjectErr(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts) |
|
|
|
|
fn, off, length, nErr := NewGetObjectReader(rs, meta.ToObjectInfo(bucket, object), opts) |
|
|
|
|
if nErr != nil { |
|
|
|
|
return nil, nErr |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pr, pw := io.Pipe() |
|
|
|
|
go func() { |
|
|
|
|
err := xl.getObject(ctx, bucket, object, off, length, pw, "", opts) |
|
|
|
|
err := xl.getObjectWithXLMeta(ctx, bucket, object, off, length, pw, "", opts, meta, metaArr, onlineDisks) |
|
|
|
|
pw.CloseWithError(err) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
@ -173,12 +172,6 @@ func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, r |
|
|
|
|
// startOffset indicates the starting read location of the object.
|
|
|
|
|
// length indicates the total length of the object.
|
|
|
|
|
func (xl xlObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { |
|
|
|
|
return xl.getObject(ctx, bucket, object, startOffset, length, writer, etag, opts) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// getObject wrapper for xl GetObject
|
|
|
|
|
func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { |
|
|
|
|
|
|
|
|
|
if err := checkGetObjArgs(ctx, bucket, object); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -202,28 +195,10 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO |
|
|
|
|
return toObjectErr(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
|
|
|
metaArr, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object) |
|
|
|
|
|
|
|
|
|
// get Quorum for this object
|
|
|
|
|
readQuorum, _, err := objectQuorumFromMeta(ctx, xl, metaArr, errs) |
|
|
|
|
if err != nil { |
|
|
|
|
return toObjectErr(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { |
|
|
|
|
return toObjectErr(reducedErr, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// List all online disks.
|
|
|
|
|
onlineDisks, modTime := listOnlineDisks(xl.getDisks(), metaArr, errs) |
|
|
|
|
|
|
|
|
|
// Pick latest valid metadata.
|
|
|
|
|
xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
return xl.getObject(ctx, bucket, object, startOffset, length, writer, etag, opts) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (xl xlObjects) getObjectWithXLMeta(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions, xlMeta xlMetaV1, metaArr []xlMetaV1, onlineDisks []StorageAPI) error { |
|
|
|
|
// Reorder online disks based on erasure distribution order.
|
|
|
|
|
onlineDisks = shuffleDisks(onlineDisks, xlMeta.Erasure.Distribution) |
|
|
|
|
|
|
|
|
@ -330,6 +305,15 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// getObject wrapper for xl GetObject
|
|
|
|
|
func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { |
|
|
|
|
xlMeta, metaArr, onlineDisks, err := xl.getObjectXLMeta(ctx, bucket, object, opts) |
|
|
|
|
if err != nil { |
|
|
|
|
return toObjectErr(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
return xl.getObjectWithXLMeta(ctx, bucket, object, startOffset, length, writer, etag, opts, xlMeta, metaArr, onlineDisks) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// getObjectInfoDir - This getObjectInfo is specific to object directory lookup.
|
|
|
|
|
func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string) (ObjectInfo, error) { |
|
|
|
|
storageDisks := xl.getDisks() |
|
|
|
@ -383,8 +367,7 @@ func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string, op |
|
|
|
|
return info, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
|
|
|
|
|
func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string, opt ObjectOptions) (objInfo ObjectInfo, err error) { |
|
|
|
|
func (xl xlObjects) getObjectXLMeta(ctx context.Context, bucket, object string, opt ObjectOptions) (xlMeta xlMetaV1, metaArr []xlMetaV1, onlineDisks []StorageAPI, err error) { |
|
|
|
|
disks := xl.getDisks() |
|
|
|
|
|
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
|
|
@ -392,21 +375,30 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string, op |
|
|
|
|
|
|
|
|
|
readQuorum, _, err := objectQuorumFromMeta(ctx, xl, metaArr, errs) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, err |
|
|
|
|
return xlMeta, nil, nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// List all the file commit ids from parts metadata.
|
|
|
|
|
modTimes := listObjectModtimes(metaArr, errs) |
|
|
|
|
|
|
|
|
|
// Reduce list of UUIDs to a single common value.
|
|
|
|
|
modTime, _ := commonTime(modTimes) |
|
|
|
|
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { |
|
|
|
|
return xlMeta, nil, nil, toObjectErr(reducedErr, bucket, object) |
|
|
|
|
} |
|
|
|
|
// List all online disks.
|
|
|
|
|
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs) |
|
|
|
|
|
|
|
|
|
// Pick latest valid metadata.
|
|
|
|
|
xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum) |
|
|
|
|
xlMeta, err = pickValidXLMeta(ctx, metaArr, modTime, readQuorum) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, err |
|
|
|
|
return xlMeta, nil, nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return xlMeta, metaArr, onlineDisks, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
|
|
|
|
|
func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string, opt ObjectOptions) (objInfo ObjectInfo, err error) { |
|
|
|
|
xlMeta, _, _, err := xl.getObjectXLMeta(ctx, bucket, object, opt) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, err |
|
|
|
|
} |
|
|
|
|
return xlMeta.ToObjectInfo(bucket, object), nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|