You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
963 lines
24 KiB
963 lines
24 KiB
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/gob"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
jsoniter "github.com/json-iterator/go"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/color"
|
|
"github.com/minio/minio/pkg/console"
|
|
"github.com/minio/minio/pkg/hash"
|
|
)
|
|
|
|
type listPathOptions struct {
|
|
// ID of the listing.
|
|
// This will be used to persist the list.
|
|
ID string
|
|
|
|
// Bucket of the listing.
|
|
Bucket string
|
|
|
|
// Directory inside the bucket.
|
|
BaseDir string
|
|
|
|
// Scan/return only content with prefix.
|
|
Prefix string
|
|
|
|
// FilterPrefix will return only results with this prefix when scanning.
|
|
// Should never contain a slash.
|
|
// Prefix should still be set.
|
|
FilterPrefix string
|
|
|
|
// Marker to resume listing.
|
|
// The response will be the first entry AFTER this object name.
|
|
Marker string
|
|
|
|
// Limit the number of results.
|
|
Limit int
|
|
|
|
// The number of disks to ask. Special values:
|
|
// 0 uses default number of disks.
|
|
// -1 use at least 50% of disks or at least the default number.
|
|
AskDisks int
|
|
|
|
// InclDeleted will keep all entries where latest version is a delete marker.
|
|
InclDeleted bool
|
|
|
|
// Scan recursively.
|
|
// If false only main directory will be scanned.
|
|
// Should always be true if Separator is n SlashSeparator.
|
|
Recursive bool
|
|
|
|
// Separator to use.
|
|
Separator string
|
|
|
|
// Create indicates that the lister should not attempt to load an existing cache.
|
|
Create bool
|
|
|
|
// CurrentCycle indicates the current bloom cycle.
|
|
// Will be used if a new scan is started.
|
|
CurrentCycle uint64
|
|
|
|
// OldestCycle indicates the oldest cycle acceptable.
|
|
OldestCycle uint64
|
|
|
|
// Include pure directories.
|
|
IncludeDirectories bool
|
|
|
|
// Transient is set if the cache is transient due to an error or being a reserved bucket.
|
|
// This means the cache metadata will not be persisted on disk.
|
|
// A transient result will never be returned from the cache so knowing the list id is required.
|
|
Transient bool
|
|
|
|
// discardResult will not persist the cache to storage.
|
|
// When the initial results are returned listing will be canceled.
|
|
discardResult bool
|
|
}
|
|
|
|
func init() {
|
|
gob.Register(listPathOptions{})
|
|
}
|
|
|
|
// newMetacache constructs a new metacache from the options.
|
|
func (o listPathOptions) newMetacache() metacache {
|
|
return metacache{
|
|
id: o.ID,
|
|
bucket: o.Bucket,
|
|
root: o.BaseDir,
|
|
recursive: o.Recursive,
|
|
status: scanStateStarted,
|
|
error: "",
|
|
started: UTCNow(),
|
|
lastHandout: UTCNow(),
|
|
lastUpdate: UTCNow(),
|
|
ended: time.Time{},
|
|
startedCycle: o.CurrentCycle,
|
|
endedCycle: 0,
|
|
dataVersion: metacacheStreamVersion,
|
|
filter: o.FilterPrefix,
|
|
}
|
|
}
|
|
|
|
func (o *listPathOptions) debugf(format string, data ...interface{}) {
|
|
if serverDebugLog {
|
|
console.Debugf(format+"\n", data...)
|
|
}
|
|
}
|
|
|
|
func (o *listPathOptions) debugln(data ...interface{}) {
|
|
if serverDebugLog {
|
|
console.Debugln(data...)
|
|
}
|
|
}
|
|
|
|
// gatherResults will collect all results on the input channel and filter results according to the options.
|
|
// Caller should close the channel when done.
|
|
// The returned function will return the results once there is enough or input is closed.
|
|
func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCacheEntriesSorted, error) {
|
|
var resultsDone = make(chan metaCacheEntriesSorted)
|
|
// Copy so we can mutate
|
|
resCh := resultsDone
|
|
resErr := io.EOF
|
|
|
|
go func() {
|
|
var results metaCacheEntriesSorted
|
|
for entry := range in {
|
|
if resCh == nil {
|
|
// past limit
|
|
continue
|
|
}
|
|
if !o.IncludeDirectories && entry.isDir() {
|
|
continue
|
|
}
|
|
o.debugln("gather got:", entry.name)
|
|
if o.Marker != "" && entry.name <= o.Marker {
|
|
o.debugln("pre marker")
|
|
continue
|
|
}
|
|
if !strings.HasPrefix(entry.name, o.Prefix) {
|
|
o.debugln("not in prefix")
|
|
continue
|
|
}
|
|
if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) {
|
|
o.debugln("not in dir", o.Prefix, o.Separator)
|
|
continue
|
|
}
|
|
if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() {
|
|
o.debugln("latest is delete marker")
|
|
continue
|
|
}
|
|
if o.Limit > 0 && results.len() >= o.Limit {
|
|
// We have enough and we have more.
|
|
// Do not return io.EOF
|
|
if resCh != nil {
|
|
resErr = nil
|
|
resCh <- results
|
|
resCh = nil
|
|
}
|
|
continue
|
|
}
|
|
o.debugln("adding...")
|
|
results.o = append(results.o, entry)
|
|
}
|
|
if resCh != nil {
|
|
resErr = io.EOF
|
|
resCh <- results
|
|
}
|
|
}()
|
|
return func() (metaCacheEntriesSorted, error) {
|
|
return <-resultsDone, resErr
|
|
}
|
|
}
|
|
|
|
// findFirstPart will find the part with 0 being the first that corresponds to the marker in the options.
|
|
// io.ErrUnexpectedEOF is returned if the place containing the marker hasn't been scanned yet.
|
|
// io.EOF indicates the marker is beyond the end of the stream and does not exist.
|
|
func (o *listPathOptions) findFirstPart(fi FileInfo) (int, error) {
|
|
search := o.Marker
|
|
if search == "" {
|
|
search = o.Prefix
|
|
}
|
|
if search == "" {
|
|
return 0, nil
|
|
}
|
|
o.debugln("searching for ", search)
|
|
var tmp metacacheBlock
|
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
|
i := 0
|
|
for {
|
|
partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, i)
|
|
v, ok := fi.Metadata[partKey]
|
|
if !ok {
|
|
o.debugln("no match in metadata, waiting")
|
|
return -1, io.ErrUnexpectedEOF
|
|
}
|
|
err := json.Unmarshal([]byte(v), &tmp)
|
|
if !ok {
|
|
logger.LogIf(context.Background(), err)
|
|
return -1, err
|
|
}
|
|
if tmp.First == "" && tmp.Last == "" && tmp.EOS {
|
|
return 0, errFileNotFound
|
|
}
|
|
if tmp.First >= search {
|
|
o.debugln("First >= search", v)
|
|
return i, nil
|
|
}
|
|
if tmp.Last >= search {
|
|
o.debugln("Last >= search", v)
|
|
return i, nil
|
|
}
|
|
if tmp.EOS {
|
|
o.debugln("no match, at EOS", v)
|
|
return -3, io.EOF
|
|
}
|
|
o.debugln("First ", tmp.First, "<", search, " search", i)
|
|
i++
|
|
}
|
|
}
|
|
|
|
// updateMetacacheListing will update the metacache listing.
|
|
func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClient) (metacache, error) {
|
|
if o.Transient {
|
|
return localMetacacheMgr.getTransient().updateCacheEntry(m)
|
|
}
|
|
if rpc == nil {
|
|
return localMetacacheMgr.updateCacheEntry(m)
|
|
}
|
|
return rpc.UpdateMetacacheListing(context.Background(), m)
|
|
}
|
|
|
|
func getMetacacheBlockInfo(fi FileInfo, block int) (*metacacheBlock, error) {
|
|
var tmp metacacheBlock
|
|
partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, block)
|
|
v, ok := fi.Metadata[partKey]
|
|
if !ok {
|
|
return nil, io.ErrUnexpectedEOF
|
|
}
|
|
return &tmp, json.Unmarshal([]byte(v), &tmp)
|
|
}
|
|
|
|
const metacachePrefix = ".metacache"
|
|
|
|
func metacachePrefixForID(bucket, id string) string {
|
|
return pathJoin(bucketMetaPrefix, bucket, metacachePrefix, id)
|
|
}
|
|
|
|
// objectPath returns the object path of the cache.
|
|
func (o *listPathOptions) objectPath(block int) string {
|
|
return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block-"+strconv.Itoa(block)+".s2")
|
|
}
|
|
|
|
func (o *listPathOptions) SetFilter() {
|
|
switch {
|
|
case metacacheSharePrefix:
|
|
return
|
|
case o.CurrentCycle != o.OldestCycle:
|
|
// We have a clean bloom filter
|
|
return
|
|
case o.Prefix == o.BaseDir:
|
|
// No additional prefix
|
|
return
|
|
}
|
|
// Remove basedir.
|
|
o.FilterPrefix = strings.TrimPrefix(o.Prefix, o.BaseDir)
|
|
// Remove leading and trailing slashes.
|
|
o.FilterPrefix = strings.Trim(o.FilterPrefix, slashSeparator)
|
|
|
|
if strings.Contains(o.FilterPrefix, slashSeparator) {
|
|
// Sanity check, should not happen.
|
|
o.FilterPrefix = ""
|
|
}
|
|
}
|
|
|
|
// filter will apply the options and return the number of objects requested by the limit.
|
|
// Will return io.EOF if there are no more entries with the same filter.
|
|
// The last entry can be used as a marker to resume the listing.
|
|
func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSorted, err error) {
|
|
// Forward to prefix, if any
|
|
err = r.forwardTo(o.Prefix)
|
|
if err != nil {
|
|
return entries, err
|
|
}
|
|
if o.Marker != "" {
|
|
err = r.forwardTo(o.Marker)
|
|
if err != nil {
|
|
return entries, err
|
|
}
|
|
next, err := r.peek()
|
|
if err != nil {
|
|
return entries, err
|
|
}
|
|
if next.name == o.Marker {
|
|
err := r.skip(1)
|
|
if err != nil {
|
|
return entries, err
|
|
}
|
|
}
|
|
}
|
|
o.debugln("forwarded to ", o.Prefix, "marker:", o.Marker, "sep:", o.Separator)
|
|
|
|
// Filter
|
|
if !o.Recursive {
|
|
entries.o = make(metaCacheEntries, 0, o.Limit)
|
|
pastPrefix := false
|
|
err := r.readFn(func(entry metaCacheEntry) bool {
|
|
if o.Prefix != "" && !strings.HasPrefix(entry.name, o.Prefix) {
|
|
// We are past the prefix, don't continue.
|
|
pastPrefix = true
|
|
return false
|
|
}
|
|
if !o.IncludeDirectories && entry.isDir() {
|
|
return true
|
|
}
|
|
if !entry.isInDir(o.Prefix, o.Separator) {
|
|
return true
|
|
}
|
|
if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() {
|
|
return entries.len() < o.Limit
|
|
}
|
|
entries.o = append(entries.o, entry)
|
|
return entries.len() < o.Limit
|
|
})
|
|
if (err != nil && err.Error() == io.EOF.Error()) || pastPrefix || r.nextEOF() {
|
|
return entries, io.EOF
|
|
}
|
|
return entries, err
|
|
}
|
|
|
|
// We should not need to filter more.
|
|
return r.readN(o.Limit, o.InclDeleted, o.IncludeDirectories, o.Prefix)
|
|
}
|
|
|
|
func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
|
|
retries := 0
|
|
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return entries, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
// If many failures, check the cache state.
|
|
if retries > 10 {
|
|
err := o.checkMetacacheState(ctx, rpc)
|
|
if err != nil {
|
|
return entries, fmt.Errorf("remote listing canceled: %w", err)
|
|
}
|
|
retries = 1
|
|
}
|
|
|
|
const retryDelay = 500 * time.Millisecond
|
|
// Load first part metadata...
|
|
// All operations are performed without locks, so we must be careful and allow for failures.
|
|
// Read metadata associated with the object from a disk.
|
|
if retries > 0 {
|
|
disks := er.getOnlineDisks()
|
|
if len(disks) == 0 {
|
|
time.Sleep(retryDelay)
|
|
retries++
|
|
continue
|
|
}
|
|
|
|
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false)
|
|
if err != nil {
|
|
time.Sleep(retryDelay)
|
|
retries++
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), false, ObjectOptions{})
|
|
if err != nil {
|
|
switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) {
|
|
case ObjectNotFound:
|
|
retries++
|
|
time.Sleep(retryDelay)
|
|
continue
|
|
case InsufficientReadQuorum:
|
|
retries++
|
|
time.Sleep(retryDelay)
|
|
continue
|
|
default:
|
|
return entries, fmt.Errorf("reading first part metadata: %w", err)
|
|
}
|
|
}
|
|
|
|
partN, err := o.findFirstPart(fi)
|
|
switch {
|
|
case err == nil:
|
|
case errors.Is(err, io.ErrUnexpectedEOF):
|
|
if retries == 10 {
|
|
err := o.checkMetacacheState(ctx, rpc)
|
|
if err != nil {
|
|
return entries, fmt.Errorf("remote listing canceled: %w", err)
|
|
}
|
|
retries = -1
|
|
}
|
|
retries++
|
|
time.Sleep(retryDelay)
|
|
continue
|
|
case errors.Is(err, io.EOF):
|
|
return entries, io.EOF
|
|
}
|
|
|
|
// We got a stream to start at.
|
|
loadedPart := 0
|
|
buf := bufferPool.Get().(*bytes.Buffer)
|
|
defer func() {
|
|
buf.Reset()
|
|
bufferPool.Put(buf)
|
|
}()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return entries, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if partN != loadedPart {
|
|
if retries > 10 {
|
|
err := o.checkMetacacheState(ctx, rpc)
|
|
if err != nil {
|
|
return entries, fmt.Errorf("waiting for next part %d: %w", partN, err)
|
|
}
|
|
retries = 1
|
|
}
|
|
|
|
if retries > 0 {
|
|
// Load from one disk only
|
|
disks := er.getOnlineDisks()
|
|
if len(disks) == 0 {
|
|
time.Sleep(retryDelay)
|
|
retries++
|
|
continue
|
|
}
|
|
|
|
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false)
|
|
if err != nil {
|
|
time.Sleep(retryDelay)
|
|
retries++
|
|
continue
|
|
}
|
|
}
|
|
// Load first part metadata...
|
|
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), false, ObjectOptions{})
|
|
if err != nil {
|
|
time.Sleep(retryDelay)
|
|
retries++
|
|
continue
|
|
}
|
|
loadedPart = partN
|
|
bi, err := getMetacacheBlockInfo(fi, partN)
|
|
logger.LogIf(ctx, err)
|
|
if err == nil {
|
|
if bi.pastPrefix(o.Prefix) {
|
|
return entries, io.EOF
|
|
}
|
|
}
|
|
}
|
|
buf.Reset()
|
|
err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, buf, fi, metaArr, onlineDisks)
|
|
if err != nil {
|
|
switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) {
|
|
case ObjectNotFound:
|
|
retries++
|
|
time.Sleep(retryDelay)
|
|
continue
|
|
case InsufficientReadQuorum:
|
|
retries++
|
|
time.Sleep(retryDelay)
|
|
continue
|
|
default:
|
|
logger.LogIf(ctx, err)
|
|
return entries, err
|
|
}
|
|
}
|
|
tmp, err := newMetacacheReader(buf)
|
|
if err != nil {
|
|
return entries, err
|
|
}
|
|
e, err := tmp.filter(o)
|
|
entries.o = append(entries.o, e.o...)
|
|
if o.Limit > 0 && entries.len() > o.Limit {
|
|
entries.truncate(o.Limit)
|
|
return entries, nil
|
|
}
|
|
if err == nil {
|
|
// We stopped within the listing, we are done for now...
|
|
return entries, nil
|
|
}
|
|
if !errors.Is(err, io.EOF) {
|
|
logger.LogIf(ctx, err)
|
|
return entries, err
|
|
}
|
|
|
|
// We finished at the end of the block.
|
|
// And should not expect any more results.
|
|
bi, err := getMetacacheBlockInfo(fi, partN)
|
|
logger.LogIf(ctx, err)
|
|
if err != nil || bi.EOS {
|
|
// We are done and there are no more parts.
|
|
return entries, io.EOF
|
|
}
|
|
if bi.endedPrefix(o.Prefix) {
|
|
// Nothing more for prefix.
|
|
return entries, io.EOF
|
|
}
|
|
partN++
|
|
retries = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
func (er erasureObjects) SetDriveCount() int {
|
|
return er.setDriveCount
|
|
}
|
|
|
|
// Will return io.EOF if continuing would not yield more results.
|
|
func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
|
|
o.debugf(color.Green("listPath:")+" with options: %#v", o)
|
|
|
|
// See if we have the listing stored.
|
|
if !o.Create && !o.discardResult {
|
|
entries, err := er.streamMetadataParts(ctx, o)
|
|
if IsErr(err, []error{
|
|
nil,
|
|
context.Canceled,
|
|
context.DeadlineExceeded,
|
|
}...) {
|
|
// Expected good errors we don't need to return error.
|
|
return entries, nil
|
|
}
|
|
|
|
if !errors.Is(err, io.EOF) { // io.EOF is expected and should be returned but no need to log it.
|
|
// Log an return errors on unexpected errors.
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
|
|
return entries, err
|
|
}
|
|
|
|
meta := o.newMetacache()
|
|
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
|
|
var metaMu sync.Mutex
|
|
|
|
o.debugln(color.Green("listPath:")+" scanning bucket:", o.Bucket, "basedir:", o.BaseDir, "prefix:", o.Prefix, "marker:", o.Marker)
|
|
|
|
// Disconnect from call above, but cancel on exit.
|
|
ctx, cancel := context.WithCancel(GlobalContext)
|
|
// We need to ask disks.
|
|
disks := er.getOnlineDisks()
|
|
|
|
defer func() {
|
|
o.debugln(color.Green("listPath:")+" returning:", entries.len(), "err:", err)
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
go func(err string) {
|
|
metaMu.Lock()
|
|
if meta.status != scanStateError {
|
|
meta.error = err
|
|
meta.status = scanStateError
|
|
}
|
|
meta, _ = o.updateMetacacheListing(meta, rpc)
|
|
metaMu.Unlock()
|
|
}(err.Error())
|
|
cancel()
|
|
}
|
|
}()
|
|
|
|
askDisks := o.AskDisks
|
|
listingQuorum := askDisks - 1
|
|
// Special case: ask all disks if the drive count is 4
|
|
if askDisks == -1 || er.SetDriveCount() == 4 {
|
|
askDisks = len(disks) // with 'strict' quorum list on all online disks.
|
|
listingQuorum = getReadQuorum(er.SetDriveCount())
|
|
}
|
|
|
|
if len(disks) < askDisks {
|
|
err = InsufficientReadQuorum{}
|
|
logger.LogIf(ctx, fmt.Errorf("listPath: Insufficient disks, %d of %d needed are available", len(disks), askDisks))
|
|
cancel()
|
|
return
|
|
}
|
|
|
|
// Select askDisks random disks.
|
|
if len(disks) > askDisks {
|
|
disks = disks[:askDisks]
|
|
}
|
|
|
|
// Create output for our results.
|
|
var cacheCh chan metaCacheEntry
|
|
if !o.discardResult {
|
|
cacheCh = make(chan metaCacheEntry, metacacheBlockSize)
|
|
}
|
|
|
|
// Create filter for results.
|
|
filterCh := make(chan metaCacheEntry, 100)
|
|
filteredResults := o.gatherResults(filterCh)
|
|
closeChannels := func() {
|
|
if !o.discardResult {
|
|
close(cacheCh)
|
|
}
|
|
close(filterCh)
|
|
}
|
|
|
|
// Cancel listing on return if non-saved list.
|
|
if o.discardResult {
|
|
defer cancel()
|
|
}
|
|
|
|
go func() {
|
|
defer cancel()
|
|
// Save continuous updates
|
|
go func() {
|
|
var err error
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
defer ticker.Stop()
|
|
var exit bool
|
|
for !exit {
|
|
select {
|
|
case <-ticker.C:
|
|
case <-ctx.Done():
|
|
exit = true
|
|
}
|
|
metaMu.Lock()
|
|
meta.endedCycle = intDataUpdateTracker.current()
|
|
meta, err = o.updateMetacacheListing(meta, rpc)
|
|
if meta.status == scanStateError {
|
|
cancel()
|
|
exit = true
|
|
}
|
|
metaMu.Unlock()
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}()
|
|
|
|
const retryDelay = 200 * time.Millisecond
|
|
const maxTries = 5
|
|
|
|
var bw *metacacheBlockWriter
|
|
// Don't save single object listings.
|
|
if !o.discardResult {
|
|
// Write results to disk.
|
|
bw = newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error {
|
|
// if the block is 0 bytes and its a first block skip it.
|
|
// skip only this for Transient caches.
|
|
if len(b.data) == 0 && b.n == 0 && o.Transient {
|
|
return nil
|
|
}
|
|
o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n))
|
|
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)), false)
|
|
logger.LogIf(ctx, err)
|
|
custom := b.headerKV()
|
|
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{
|
|
UserDefined: custom,
|
|
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
|
|
})
|
|
if err != nil {
|
|
metaMu.Lock()
|
|
if meta.error != "" {
|
|
meta.status = scanStateError
|
|
meta.error = err.Error()
|
|
}
|
|
metaMu.Unlock()
|
|
cancel()
|
|
return err
|
|
}
|
|
if b.n == 0 {
|
|
return nil
|
|
}
|
|
// Update block 0 metadata.
|
|
var retries int
|
|
for {
|
|
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{})
|
|
if err == nil {
|
|
break
|
|
}
|
|
switch err.(type) {
|
|
case ObjectNotFound:
|
|
return err
|
|
case InsufficientReadQuorum:
|
|
default:
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
if retries >= maxTries {
|
|
return err
|
|
}
|
|
retries++
|
|
time.Sleep(retryDelay)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// How to resolve results.
|
|
resolver := metadataResolutionParams{
|
|
dirQuorum: listingQuorum,
|
|
objQuorum: listingQuorum,
|
|
bucket: o.Bucket,
|
|
}
|
|
|
|
err := listPathRaw(ctx, listPathRawOptions{
|
|
disks: disks,
|
|
bucket: o.Bucket,
|
|
path: o.BaseDir,
|
|
recursive: o.Recursive,
|
|
filterPrefix: o.FilterPrefix,
|
|
minDisks: listingQuorum,
|
|
agreed: func(entry metaCacheEntry) {
|
|
if !o.discardResult {
|
|
cacheCh <- entry
|
|
}
|
|
filterCh <- entry
|
|
},
|
|
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
|
|
// Results Disagree :-(
|
|
entry, ok := entries.resolve(&resolver)
|
|
if ok {
|
|
if !o.discardResult {
|
|
cacheCh <- *entry
|
|
}
|
|
filterCh <- *entry
|
|
}
|
|
},
|
|
})
|
|
|
|
metaMu.Lock()
|
|
if err != nil {
|
|
meta.status = scanStateError
|
|
meta.error = err.Error()
|
|
}
|
|
// Save success
|
|
if meta.error == "" {
|
|
meta.status = scanStateSuccess
|
|
meta.endedCycle = intDataUpdateTracker.current()
|
|
}
|
|
|
|
meta, _ = o.updateMetacacheListing(meta, rpc)
|
|
metaMu.Unlock()
|
|
|
|
closeChannels()
|
|
if !o.discardResult {
|
|
if err := bw.Close(); err != nil {
|
|
metaMu.Lock()
|
|
meta.error = err.Error()
|
|
meta.status = scanStateError
|
|
meta, err = o.updateMetacacheListing(meta, rpc)
|
|
metaMu.Unlock()
|
|
}
|
|
}
|
|
}()
|
|
|
|
return filteredResults()
|
|
}
|
|
|
|
type listPathRawOptions struct {
|
|
disks []StorageAPI
|
|
bucket, path string
|
|
recursive bool
|
|
filterPrefix string
|
|
// Minimum number of good disks to continue.
|
|
// An error will be returned if this many disks returned an error.
|
|
minDisks int
|
|
reportNotFound bool
|
|
|
|
// Callbacks with results:
|
|
// If set to nil, it will not be called.
|
|
|
|
// agreed is called if all disks agreed.
|
|
agreed func(entry metaCacheEntry)
|
|
|
|
// partial will be returned when there is disagreement between disks.
|
|
// if disk did not return any result, but also haven't errored
|
|
// the entry will be empty and errs will
|
|
partial func(entries metaCacheEntries, nAgreed int, errs []error)
|
|
|
|
// finished will be called when all streams have finished and
|
|
// more than one disk returned an error.
|
|
// Will not be called if everything operates as expected.
|
|
finished func(errs []error)
|
|
}
|
|
|
|
// listPathRaw will list a path on the provided drives.
|
|
// See listPathRawOptions on how results are delivered.
|
|
// Directories are always returned.
|
|
// Cache will be bypassed.
|
|
// Context cancellation will be respected but may take a while to effectuate.
|
|
func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
|
|
disks := opts.disks
|
|
if len(disks) == 0 {
|
|
return fmt.Errorf("listPathRaw: 0 drives provided")
|
|
}
|
|
|
|
// Disconnect from call above, but cancel on exit.
|
|
ctx, cancel := context.WithCancel(GlobalContext)
|
|
defer cancel()
|
|
|
|
askDisks := len(disks)
|
|
readers := make([]*metacacheReader, askDisks)
|
|
for i := range disks {
|
|
r, w := io.Pipe()
|
|
d := disks[i]
|
|
readers[i], err = newMetacacheReader(r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Send request to each disk.
|
|
go func() {
|
|
err := d.WalkDir(ctx, WalkDirOptions{
|
|
Bucket: opts.bucket,
|
|
BaseDir: opts.path,
|
|
Recursive: opts.recursive,
|
|
ReportNotFound: opts.reportNotFound,
|
|
FilterPrefix: opts.filterPrefix}, w)
|
|
w.CloseWithError(err)
|
|
if err != io.EOF && err != nil && err.Error() != errFileNotFound.Error() {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
topEntries := make(metaCacheEntries, len(readers))
|
|
errs := make([]error, len(readers))
|
|
for {
|
|
// Get the top entry from each
|
|
var current metaCacheEntry
|
|
var atEOF, fnf, hasErr, agree int
|
|
for i := range topEntries {
|
|
topEntries[i] = metaCacheEntry{}
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
for i, r := range readers {
|
|
if errs[i] != nil {
|
|
hasErr++
|
|
continue
|
|
}
|
|
entry, err := r.peek()
|
|
switch err {
|
|
case io.EOF:
|
|
atEOF++
|
|
continue
|
|
case nil:
|
|
default:
|
|
if err.Error() == errFileNotFound.Error() {
|
|
atEOF++
|
|
fnf++
|
|
continue
|
|
}
|
|
|
|
hasErr++
|
|
errs[i] = err
|
|
continue
|
|
}
|
|
// If no current, add it.
|
|
if current.name == "" {
|
|
topEntries[i] = entry
|
|
current = entry
|
|
agree++
|
|
continue
|
|
}
|
|
// If exact match, we agree.
|
|
if current.matches(&entry, opts.bucket) {
|
|
topEntries[i] = entry
|
|
agree++
|
|
continue
|
|
}
|
|
// If only the name matches we didn't agree, but add it for resolution.
|
|
if entry.name == current.name {
|
|
topEntries[i] = entry
|
|
continue
|
|
}
|
|
// We got different entries
|
|
if entry.name > current.name {
|
|
continue
|
|
}
|
|
// We got a new, better current.
|
|
// Clear existing entries.
|
|
for i := range topEntries[:i] {
|
|
topEntries[i] = metaCacheEntry{}
|
|
}
|
|
agree = 1
|
|
current = entry
|
|
topEntries[i] = entry
|
|
}
|
|
|
|
// Stop if we exceed number of bad disks
|
|
if hasErr > len(disks)-opts.minDisks && hasErr > 0 {
|
|
if opts.finished != nil {
|
|
opts.finished(errs)
|
|
}
|
|
var combinedErr []string
|
|
for i, err := range errs {
|
|
if err != nil {
|
|
combinedErr = append(combinedErr, fmt.Sprintf("disk %d returned: %s", i, err))
|
|
}
|
|
}
|
|
return errors.New(strings.Join(combinedErr, ", "))
|
|
}
|
|
|
|
// Break if all at EOF or error.
|
|
if atEOF+hasErr == len(readers) {
|
|
if hasErr > 0 && opts.finished != nil {
|
|
opts.finished(errs)
|
|
}
|
|
break
|
|
}
|
|
if fnf == len(readers) {
|
|
return errFileNotFound
|
|
}
|
|
if agree == len(readers) {
|
|
// Everybody agreed
|
|
for _, r := range readers {
|
|
r.skip(1)
|
|
}
|
|
if opts.agreed != nil {
|
|
opts.agreed(current)
|
|
}
|
|
continue
|
|
}
|
|
if opts.partial != nil {
|
|
opts.partial(topEntries, agree, errs)
|
|
}
|
|
// Skip the inputs we used.
|
|
for i, r := range readers {
|
|
if topEntries[i].name != "" {
|
|
r.skip(1)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|