You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
minio/cmd/metacache-set.go

969 lines
24 KiB

/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/console"
"github.com/minio/minio/pkg/hash"
)
type listPathOptions struct {
// ID of the listing.
// This will be used to persist the list.
ID string
// Bucket of the listing.
Bucket string
// Directory inside the bucket.
BaseDir string
// Scan/return only content with prefix.
Prefix string
// FilterPrefix will return only results with this prefix when scanning.
// Should never contain a slash.
// Prefix should still be set.
FilterPrefix string
// Marker to resume listing.
// The response will be the first entry AFTER this object name.
Marker string
// Limit the number of results.
Limit int
// The number of disks to ask. Special values:
// 0 uses default number of disks.
// -1 use at least 50% of disks or at least the default number.
AskDisks int
// InclDeleted will keep all entries where latest version is a delete marker.
InclDeleted bool
// Scan recursively.
// If false only main directory will be scanned.
// Should always be true if Separator is n SlashSeparator.
Recursive bool
// Separator to use.
Separator string
// Create indicates that the lister should not attempt to load an existing cache.
Create bool
// CurrentCycle indicates the current bloom cycle.
// Will be used if a new scan is started.
CurrentCycle uint64
// OldestCycle indicates the oldest cycle acceptable.
OldestCycle uint64
// Include pure directories.
IncludeDirectories bool
// Transient is set if the cache is transient due to an error or being a reserved bucket.
// This means the cache metadata will not be persisted on disk.
// A transient result will never be returned from the cache so knowing the list id is required.
Transient bool
// discardResult will not persist the cache to storage.
// When the initial results are returned listing will be canceled.
discardResult bool
}
func init() {
gob.Register(listPathOptions{})
}
// newMetacache constructs a new metacache from the options.
func (o listPathOptions) newMetacache() metacache {
return metacache{
id: o.ID,
bucket: o.Bucket,
root: o.BaseDir,
recursive: o.Recursive,
status: scanStateStarted,
error: "",
started: UTCNow(),
lastHandout: UTCNow(),
lastUpdate: UTCNow(),
ended: time.Time{},
startedCycle: o.CurrentCycle,
endedCycle: 0,
dataVersion: metacacheStreamVersion,
filter: o.FilterPrefix,
}
}
func (o *listPathOptions) debugf(format string, data ...interface{}) {
if serverDebugLog {
console.Debugf(format+"\n", data...)
}
}
func (o *listPathOptions) debugln(data ...interface{}) {
if serverDebugLog {
console.Debugln(data...)
}
}
// gatherResults will collect all results on the input channel and filter results according to the options.
// Caller should close the channel when done.
// The returned function will return the results once there is enough or input is closed.
func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCacheEntriesSorted, error) {
var resultsDone = make(chan metaCacheEntriesSorted)
// Copy so we can mutate
resCh := resultsDone
resErr := io.EOF
go func() {
var results metaCacheEntriesSorted
for entry := range in {
if resCh == nil {
// past limit
continue
}
if !o.IncludeDirectories && entry.isDir() {
continue
}
o.debugln("gather got:", entry.name)
if o.Marker != "" && entry.name <= o.Marker {
o.debugln("pre marker")
continue
}
if !strings.HasPrefix(entry.name, o.Prefix) {
o.debugln("not in prefix")
continue
}
if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) {
o.debugln("not in dir", o.Prefix, o.Separator)
continue
}
if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() {
o.debugln("latest is delete marker")
continue
}
if o.Limit > 0 && results.len() >= o.Limit {
// We have enough and we have more.
// Do not return io.EOF
if resCh != nil {
resErr = nil
resCh <- results
resCh = nil
}
continue
}
o.debugln("adding...")
results.o = append(results.o, entry)
}
if resCh != nil {
resErr = io.EOF
resCh <- results
}
}()
return func() (metaCacheEntriesSorted, error) {
return <-resultsDone, resErr
}
}
// findFirstPart will find the part with 0 being the first that corresponds to the marker in the options.
// io.ErrUnexpectedEOF is returned if the place containing the marker hasn't been scanned yet.
// io.EOF indicates the marker is beyond the end of the stream and does not exist.
func (o *listPathOptions) findFirstPart(fi FileInfo) (int, error) {
search := o.Marker
if search == "" {
search = o.Prefix
}
if search == "" {
return 0, nil
}
o.debugln("searching for ", search)
var tmp metacacheBlock
var json = jsoniter.ConfigCompatibleWithStandardLibrary
i := 0
for {
partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, i)
v, ok := fi.Metadata[partKey]
if !ok {
o.debugln("no match in metadata, waiting")
return -1, io.ErrUnexpectedEOF
}
err := json.Unmarshal([]byte(v), &tmp)
if !ok {
logger.LogIf(context.Background(), err)
return -1, err
}
if tmp.First == "" && tmp.Last == "" && tmp.EOS {
return 0, errFileNotFound
}
if tmp.First >= search {
o.debugln("First >= search", v)
return i, nil
}
if tmp.Last >= search {
o.debugln("Last >= search", v)
return i, nil
}
if tmp.EOS {
o.debugln("no match, at EOS", v)
return -3, io.EOF
}
o.debugln("First ", tmp.First, "<", search, " search", i)
i++
}
}
// updateMetacacheListing will update the metacache listing.
func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClient) (metacache, error) {
if o.Transient {
return localMetacacheMgr.getTransient().updateCacheEntry(m)
}
if rpc == nil {
return localMetacacheMgr.updateCacheEntry(m)
}
return rpc.UpdateMetacacheListing(context.Background(), m)
}
func getMetacacheBlockInfo(fi FileInfo, block int) (*metacacheBlock, error) {
var tmp metacacheBlock
partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, block)
v, ok := fi.Metadata[partKey]
if !ok {
return nil, io.ErrUnexpectedEOF
}
return &tmp, json.Unmarshal([]byte(v), &tmp)
}
const metacachePrefix = ".metacache"
func metacachePrefixForID(bucket, id string) string {
return pathJoin(bucketMetaPrefix, bucket, metacachePrefix, id)
}
// objectPath returns the object path of the cache.
func (o *listPathOptions) objectPath(block int) string {
return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block-"+strconv.Itoa(block)+".s2")
}
func (o *listPathOptions) SetFilter() {
switch {
case metacacheSharePrefix:
return
case o.CurrentCycle != o.OldestCycle:
// We have a clean bloom filter
return
case o.Prefix == o.BaseDir:
// No additional prefix
return
}
// Remove basedir.
o.FilterPrefix = strings.TrimPrefix(o.Prefix, o.BaseDir)
// Remove leading and trailing slashes.
o.FilterPrefix = strings.Trim(o.FilterPrefix, slashSeparator)
if strings.Contains(o.FilterPrefix, slashSeparator) {
// Sanity check, should not happen.
o.FilterPrefix = ""
}
}
// filter will apply the options and return the number of objects requested by the limit.
// Will return io.EOF if there are no more entries with the same filter.
// The last entry can be used as a marker to resume the listing.
func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSorted, err error) {
// Forward to prefix, if any
err = r.forwardTo(o.Prefix)
if err != nil {
return entries, err
}
if o.Marker != "" {
err = r.forwardTo(o.Marker)
if err != nil {
return entries, err
}
next, err := r.peek()
if err != nil {
return entries, err
}
if next.name == o.Marker {
err := r.skip(1)
if err != nil {
return entries, err
}
}
}
o.debugln("forwarded to ", o.Prefix, "marker:", o.Marker, "sep:", o.Separator)
// Filter
if !o.Recursive {
entries.o = make(metaCacheEntries, 0, o.Limit)
pastPrefix := false
err := r.readFn(func(entry metaCacheEntry) bool {
if o.Prefix != "" && !strings.HasPrefix(entry.name, o.Prefix) {
// We are past the prefix, don't continue.
pastPrefix = true
return false
}
if !o.IncludeDirectories && entry.isDir() {
return true
}
if !entry.isInDir(o.Prefix, o.Separator) {
return true
}
if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() {
return entries.len() < o.Limit
}
entries.o = append(entries.o, entry)
return entries.len() < o.Limit
})
if (err != nil && err.Error() == io.EOF.Error()) || pastPrefix || r.nextEOF() {
return entries, io.EOF
}
return entries, err
}
// We should not need to filter more.
return r.readN(o.Limit, o.InclDeleted, o.IncludeDirectories, o.Prefix)
}
func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
retries := 0
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
for {
select {
case <-ctx.Done():
return entries, ctx.Err()
default:
}
// If many failures, check the cache state.
if retries > 10 {
err := o.checkMetacacheState(ctx, rpc)
if err != nil {
return entries, fmt.Errorf("remote listing canceled: %w", err)
}
retries = 1
}
const retryDelay = 500 * time.Millisecond
// Load first part metadata...
// All operations are performed without locks, so we must be careful and allow for failures.
// Read metadata associated with the object from a disk.
if retries > 0 {
disks := er.getOnlineDisks()
if len(disks) == 0 {
time.Sleep(retryDelay)
retries++
continue
}
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false)
if err != nil {
time.Sleep(retryDelay)
retries++
continue
}
}
// Read metadata associated with the object from all disks.
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true)
if err != nil {
switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) {
case ObjectNotFound:
retries++
time.Sleep(retryDelay)
continue
case InsufficientReadQuorum:
retries++
time.Sleep(retryDelay)
continue
default:
return entries, fmt.Errorf("reading first part metadata: %w", err)
}
}
partN, err := o.findFirstPart(fi)
switch {
case err == nil:
case errors.Is(err, io.ErrUnexpectedEOF):
if retries == 10 {
err := o.checkMetacacheState(ctx, rpc)
if err != nil {
return entries, fmt.Errorf("remote listing canceled: %w", err)
}
retries = -1
}
retries++
time.Sleep(retryDelay)
continue
case errors.Is(err, io.EOF):
return entries, io.EOF
}
// We got a stream to start at.
loadedPart := 0
buf := bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufferPool.Put(buf)
}()
for {
select {
case <-ctx.Done():
return entries, ctx.Err()
default:
}
if partN != loadedPart {
if retries > 10 {
err := o.checkMetacacheState(ctx, rpc)
if err != nil {
return entries, fmt.Errorf("waiting for next part %d: %w", partN, err)
}
retries = 1
}
if retries > 0 {
// Load from one disk only
disks := er.getOnlineDisks()
if len(disks) == 0 {
time.Sleep(retryDelay)
retries++
continue
}
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false)
if err != nil {
time.Sleep(retryDelay)
retries++
continue
}
}
// Load first part metadata...
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true)
if err != nil {
time.Sleep(retryDelay)
retries++
continue
}
loadedPart = partN
bi, err := getMetacacheBlockInfo(fi, partN)
logger.LogIf(ctx, err)
if err == nil {
if bi.pastPrefix(o.Prefix) {
return entries, io.EOF
}
}
}
buf.Reset()
err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, buf, fi, metaArr, onlineDisks)
if err != nil {
switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) {
case ObjectNotFound:
retries++
time.Sleep(retryDelay)
continue
case InsufficientReadQuorum:
retries++
time.Sleep(retryDelay)
continue
default:
logger.LogIf(ctx, err)
return entries, err
}
}
tmp, err := newMetacacheReader(buf)
if err != nil {
return entries, err
}
e, err := tmp.filter(o)
entries.o = append(entries.o, e.o...)
if o.Limit > 0 && entries.len() > o.Limit {
entries.truncate(o.Limit)
return entries, nil
}
if err == nil {
// We stopped within the listing, we are done for now...
return entries, nil
}
if !errors.Is(err, io.EOF) {
logger.LogIf(ctx, err)
return entries, err
}
// We finished at the end of the block.
// And should not expect any more results.
bi, err := getMetacacheBlockInfo(fi, partN)
logger.LogIf(ctx, err)
if err != nil || bi.EOS {
// We are done and there are no more parts.
return entries, io.EOF
}
if bi.endedPrefix(o.Prefix) {
// Nothing more for prefix.
return entries, io.EOF
}
partN++
retries = 0
}
}
}
func (er erasureObjects) SetDriveCount() int {
return er.setDriveCount
}
// Will return io.EOF if continuing would not yield more results.
func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
o.debugf(color.Green("listPath:")+" with options: %#v", o)
// See if we have the listing stored.
if !o.Create && !o.discardResult {
entries, err := er.streamMetadataParts(ctx, o)
if IsErr(err, []error{
nil,
context.Canceled,
context.DeadlineExceeded,
}...) {
// Expected good errors we don't need to return error.
return entries, nil
}
if !errors.Is(err, io.EOF) { // io.EOF is expected and should be returned but no need to log it.
// Log an return errors on unexpected errors.
logger.LogIf(ctx, err)
}
return entries, err
}
meta := o.newMetacache()
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
var metaMu sync.Mutex
o.debugln(color.Green("listPath:")+" scanning bucket:", o.Bucket, "basedir:", o.BaseDir, "prefix:", o.Prefix, "marker:", o.Marker)
// Disconnect from call above, but cancel on exit.
ctx, cancel := context.WithCancel(GlobalContext)
// We need to ask disks.
disks := er.getOnlineDisks()
defer func() {
o.debugln(color.Green("listPath:")+" returning:", entries.len(), "err:", err)
if err != nil && !errors.Is(err, io.EOF) {
go func(err string) {
metaMu.Lock()
if meta.status != scanStateError {
meta.error = err
meta.status = scanStateError
}
meta, _ = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
}(err.Error())
cancel()
}
}()
askDisks := o.AskDisks
listingQuorum := askDisks - 1
// Special case: ask all disks if the drive count is 4
if askDisks == -1 || er.SetDriveCount() == 4 {
askDisks = len(disks) // with 'strict' quorum list on all online disks.
listingQuorum = getReadQuorum(er.SetDriveCount())
}
if len(disks) < askDisks {
err = InsufficientReadQuorum{}
logger.LogIf(ctx, fmt.Errorf("listPath: Insufficient disks, %d of %d needed are available", len(disks), askDisks))
cancel()
return
}
// Select askDisks random disks.
if len(disks) > askDisks {
disks = disks[:askDisks]
}
// Create output for our results.
var cacheCh chan metaCacheEntry
if !o.discardResult {
cacheCh = make(chan metaCacheEntry, metacacheBlockSize)
}
// Create filter for results.
filterCh := make(chan metaCacheEntry, 100)
filteredResults := o.gatherResults(filterCh)
closeChannels := func() {
if !o.discardResult {
close(cacheCh)
}
close(filterCh)
}
// Cancel listing on return if non-saved list.
if o.discardResult {
defer cancel()
}
go func() {
defer cancel()
// Save continuous updates
go func() {
var err error
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
var exit bool
for !exit {
select {
case <-ticker.C:
case <-ctx.Done():
exit = true
}
metaMu.Lock()
meta.endedCycle = intDataUpdateTracker.current()
meta, err = o.updateMetacacheListing(meta, rpc)
if meta.status == scanStateError {
logger.LogIf(ctx, err)
cancel()
exit = true
}
metaMu.Unlock()
}
}()
const retryDelay = 200 * time.Millisecond
const maxTries = 5
var bw *metacacheBlockWriter
// Don't save single object listings.
if !o.discardResult {
// Write results to disk.
bw = newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error {
// if the block is 0 bytes and its a first block skip it.
// skip only this for Transient caches.
if len(b.data) == 0 && b.n == 0 && o.Transient {
return nil
}
o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n))
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)), false)
logger.LogIf(ctx, err)
custom := b.headerKV()
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{
UserDefined: custom,
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
ParentIsObject: nil,
})
if err != nil {
metaMu.Lock()
if meta.error != "" {
meta.status = scanStateError
meta.error = err.Error()
}
metaMu.Unlock()
cancel()
return err
}
if b.n == 0 {
return nil
}
// Update block 0 metadata.
var retries int
for {
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{})
if err == nil {
break
}
switch err.(type) {
case ObjectNotFound:
return err
case InsufficientReadQuorum:
default:
logger.LogIf(ctx, err)
}
if retries >= maxTries {
return err
}
retries++
time.Sleep(retryDelay)
}
return nil
})
}
// How to resolve results.
resolver := metadataResolutionParams{
dirQuorum: listingQuorum,
objQuorum: listingQuorum,
bucket: o.Bucket,
}
err := listPathRaw(ctx, listPathRawOptions{
disks: disks,
bucket: o.Bucket,
path: o.BaseDir,
recursive: o.Recursive,
filterPrefix: o.FilterPrefix,
minDisks: listingQuorum,
agreed: func(entry metaCacheEntry) {
if !o.discardResult {
cacheCh <- entry
}
filterCh <- entry
},
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
// Results Disagree :-(
entry, ok := entries.resolve(&resolver)
if ok {
if !o.discardResult {
cacheCh <- *entry
}
filterCh <- *entry
}
},
})
metaMu.Lock()
if err != nil {
meta.status = scanStateError
meta.error = err.Error()
}
// Save success
if meta.error == "" {
meta.status = scanStateSuccess
meta.endedCycle = intDataUpdateTracker.current()
}
meta, _ = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
closeChannels()
if !o.discardResult {
if err := bw.Close(); err != nil {
metaMu.Lock()
meta.error = err.Error()
meta.status = scanStateError
meta, _ = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
}
}
}()
return filteredResults()
}
type listPathRawOptions struct {
disks []StorageAPI
bucket, path string
recursive bool
filterPrefix string
// Minimum number of good disks to continue.
// An error will be returned if this many disks returned an error.
minDisks int
reportNotFound bool
// Callbacks with results:
// If set to nil, it will not be called.
// agreed is called if all disks agreed.
agreed func(entry metaCacheEntry)
// partial will be returned when there is disagreement between disks.
// if disk did not return any result, but also haven't errored
// the entry will be empty and errs will
partial func(entries metaCacheEntries, nAgreed int, errs []error)
// finished will be called when all streams have finished and
// more than one disk returned an error.
// Will not be called if everything operates as expected.
finished func(errs []error)
}
// listPathRaw will list a path on the provided drives.
// See listPathRawOptions on how results are delivered.
// Directories are always returned.
// Cache will be bypassed.
// Context cancellation will be respected but may take a while to effectuate.
func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
disks := opts.disks
if len(disks) == 0 {
return fmt.Errorf("listPathRaw: 0 drives provided")
}
// Disconnect from call above, but cancel on exit.
ctx, cancel := context.WithCancel(GlobalContext)
defer cancel()
askDisks := len(disks)
readers := make([]*metacacheReader, askDisks)
for i := range disks {
r, w := io.Pipe()
d := disks[i]
readers[i], err = newMetacacheReader(r)
if err != nil {
return err
}
// Send request to each disk.
go func() {
werr := d.WalkDir(ctx, WalkDirOptions{
Bucket: opts.bucket,
BaseDir: opts.path,
Recursive: opts.recursive,
ReportNotFound: opts.reportNotFound,
FilterPrefix: opts.filterPrefix}, w)
w.CloseWithError(werr)
if werr != io.EOF && werr != nil && werr.Error() != errFileNotFound.Error() && werr.Error() != errVolumeNotFound.Error() {
logger.LogIf(ctx, werr)
}
}()
}
topEntries := make(metaCacheEntries, len(readers))
errs := make([]error, len(readers))
for {
// Get the top entry from each
var current metaCacheEntry
var atEOF, fnf, hasErr, agree int
for i := range topEntries {
topEntries[i] = metaCacheEntry{}
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
for i, r := range readers {
if errs[i] != nil {
hasErr++
continue
}
entry, err := r.peek()
switch err {
case io.EOF:
atEOF++
continue
case nil:
default:
if err.Error() == errFileNotFound.Error() {
atEOF++
fnf++
continue
}
if err.Error() == errVolumeNotFound.Error() {
atEOF++
fnf++
continue
}
hasErr++
errs[i] = err
continue
}
// If no current, add it.
if current.name == "" {
topEntries[i] = entry
current = entry
agree++
continue
}
// If exact match, we agree.
if current.matches(&entry, opts.bucket) {
topEntries[i] = entry
agree++
continue
}
// If only the name matches we didn't agree, but add it for resolution.
if entry.name == current.name {
topEntries[i] = entry
continue
}
// We got different entries
if entry.name > current.name {
continue
}
// We got a new, better current.
// Clear existing entries.
for i := range topEntries[:i] {
topEntries[i] = metaCacheEntry{}
}
agree = 1
current = entry
topEntries[i] = entry
}
// Stop if we exceed number of bad disks
if hasErr > len(disks)-opts.minDisks && hasErr > 0 {
if opts.finished != nil {
opts.finished(errs)
}
var combinedErr []string
for i, err := range errs {
if err != nil {
combinedErr = append(combinedErr, fmt.Sprintf("disk %d returned: %s", i, err))
}
}
return errors.New(strings.Join(combinedErr, ", "))
}
// Break if all at EOF or error.
if atEOF+hasErr == len(readers) {
if hasErr > 0 && opts.finished != nil {
opts.finished(errs)
}
break
}
if fnf == len(readers) {
return errFileNotFound
}
if agree == len(readers) {
// Everybody agreed
for _, r := range readers {
r.skip(1)
}
if opts.agreed != nil {
opts.agreed(current)
}
continue
}
if opts.partial != nil {
opts.partial(topEntries, agree, errs)
}
// Skip the inputs we used.
for i, r := range readers {
if topEntries[i].name != "" {
r.skip(1)
}
}
}
return nil
}