|
|
|
@ -351,23 +351,42 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If many failures, check the cache state.
|
|
|
|
|
if retries > 10 { |
|
|
|
|
err := o.checkMetacacheState(ctx, rpc) |
|
|
|
|
if debugPrint { |
|
|
|
|
logger.Info("waiting for first part (%s), err: %v", o.objectPath(0), err) |
|
|
|
|
} |
|
|
|
|
if err != nil { |
|
|
|
|
return entries, err |
|
|
|
|
} |
|
|
|
|
retries = 1 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const retryDelay = 500 * time.Millisecond |
|
|
|
|
// Load first part metadata...
|
|
|
|
|
// All operations are performed without locks, so we must be careful and allow for failures.
|
|
|
|
|
// Read metadata associated with the object from a disk.
|
|
|
|
|
if retries > 0 { |
|
|
|
|
disks := er.getOnlineDisks() |
|
|
|
|
if len(disks) == 0 { |
|
|
|
|
time.Sleep(retryDelay) |
|
|
|
|
retries++ |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false) |
|
|
|
|
if err != nil { |
|
|
|
|
time.Sleep(retryDelay) |
|
|
|
|
retries++ |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
|
|
|
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
if err == errFileNotFound || errors.Is(err, errErasureReadQuorum) || errors.Is(err, InsufficientReadQuorum{}) { |
|
|
|
|
// Not ready yet...
|
|
|
|
|
if retries == 10 { |
|
|
|
|
err := o.checkMetacacheState(ctx, rpc) |
|
|
|
|
if debugPrint { |
|
|
|
|
logger.Info("waiting for first part (%s), err: %v", o.objectPath(0), err) |
|
|
|
|
} |
|
|
|
|
if err != nil { |
|
|
|
|
return entries, err |
|
|
|
|
} |
|
|
|
|
retries = -1 |
|
|
|
|
} |
|
|
|
|
retries++ |
|
|
|
|
time.Sleep(retryDelay) |
|
|
|
|
continue |
|
|
|
@ -414,28 +433,41 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if partN != loadedPart { |
|
|
|
|
if retries > 10 { |
|
|
|
|
err := o.checkMetacacheState(ctx, rpc) |
|
|
|
|
if debugPrint { |
|
|
|
|
logger.Info("waiting for part data (%v), err: %v", o.objectPath(partN), err) |
|
|
|
|
} |
|
|
|
|
if err != nil { |
|
|
|
|
return entries, err |
|
|
|
|
} |
|
|
|
|
retries = 1 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if retries > 0 { |
|
|
|
|
// Load from one disk only
|
|
|
|
|
disks := er.getOnlineDisks() |
|
|
|
|
if len(disks) == 0 { |
|
|
|
|
time.Sleep(retryDelay) |
|
|
|
|
retries++ |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false) |
|
|
|
|
if err != nil { |
|
|
|
|
time.Sleep(retryDelay) |
|
|
|
|
retries++ |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Load first part metadata...
|
|
|
|
|
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}) |
|
|
|
|
switch err { |
|
|
|
|
case errFileNotFound, errErasureReadQuorum, InsufficientReadQuorum{}: |
|
|
|
|
if retries >= 10 { |
|
|
|
|
err := o.checkMetacacheState(ctx, rpc) |
|
|
|
|
if debugPrint { |
|
|
|
|
logger.Info("waiting for part data (%v), err: %v", o.objectPath(partN), err) |
|
|
|
|
} |
|
|
|
|
if err != nil { |
|
|
|
|
return entries, err |
|
|
|
|
} |
|
|
|
|
retries = -1 |
|
|
|
|
} |
|
|
|
|
time.Sleep(retryDelay) |
|
|
|
|
retries++ |
|
|
|
|
continue |
|
|
|
|
default: |
|
|
|
|
if retries >= 20 { |
|
|
|
|
// We had at least 10 retries without getting a result.
|
|
|
|
|
logger.LogIf(ctx, err) |
|
|
|
|
return entries, err |
|
|
|
|
} |
|
|
|
|
time.Sleep(retryDelay) |
|
|
|
|
retries++ |
|
|
|
|
continue |
|
|
|
@ -457,13 +489,8 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt |
|
|
|
|
err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, &buf, fi, metaArr, onlineDisks) |
|
|
|
|
switch err { |
|
|
|
|
case errFileNotFound, errErasureReadQuorum, InsufficientReadQuorum{}: |
|
|
|
|
if retries >= 20 { |
|
|
|
|
// We had at least 10 retries without getting a result.
|
|
|
|
|
logger.LogIf(ctx, err) |
|
|
|
|
return entries, err |
|
|
|
|
} |
|
|
|
|
retries++ |
|
|
|
|
time.Sleep(retryDelay) |
|
|
|
|
retries++ |
|
|
|
|
continue |
|
|
|
|
default: |
|
|
|
|
logger.LogIf(ctx, err) |
|
|
|
@ -495,6 +522,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt |
|
|
|
|
return entries, io.EOF |
|
|
|
|
} |
|
|
|
|
partN++ |
|
|
|
|
retries = 0 |
|
|
|
|
case nil: |
|
|
|
|
// We stopped within the listing, we are done for now...
|
|
|
|
|
return entries, nil |
|
|
|
|