listobjectversions: Add shortcut for Veeam blocks (#10893)

Add shortcut for `APN/1.0 Veeam/1.0 Backup/10.0`

It requests unique blocks with a specific prefix. We skip 
scanning the parent directory for more objects matching the prefix.
master
Klaus Post 4 years ago committed by GitHub
parent 17a5ff51ff
commit b5a3d79bce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      cmd/erasure-server-sets.go
  2. 4
      cmd/metacache-server-sets.go
  3. 9
      cmd/metacache-set.go
  4. 10
      cmd/metacache-walk.go

@ -24,6 +24,7 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -662,7 +663,8 @@ func (z *erasureServerSets) ListObjectVersions(ctx context.Context, bucket, pref
if marker == "" && versionMarker != "" { if marker == "" && versionMarker != "" {
return loi, NotImplemented{} return loi, NotImplemented{}
} }
merged, err := z.listPath(ctx, listPathOptions{
opts := listPathOptions{
Bucket: bucket, Bucket: bucket,
Prefix: prefix, Prefix: prefix,
Separator: delimiter, Separator: delimiter,
@ -670,7 +672,19 @@ func (z *erasureServerSets) ListObjectVersions(ctx context.Context, bucket, pref
Marker: marker, Marker: marker,
InclDeleted: true, InclDeleted: true,
AskDisks: globalAPIConfig.getListQuorum(), AskDisks: globalAPIConfig.getListQuorum(),
}) }
// Shortcut for APN/1.0 Veeam/1.0 Backup/10.0
// It requests unique blocks with a specific prefix.
// We skip scanning the parent directory for
// more objects matching the prefix.
ri := logger.GetReqInfo(ctx)
if ri != nil && strings.Contains(ri.UserAgent, `1.0 Veeam/1.0 Backup`) && strings.HasSuffix(prefix, ".blk") {
opts.singleObject = true
opts.Transient = true
}
merged, err := z.listPath(ctx, opts)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return loi, err return loi, err
} }

@ -83,6 +83,10 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en
o.ID = mustGetUUID() o.ID = mustGetUUID()
} }
o.BaseDir = baseDirFromPrefix(o.Prefix) o.BaseDir = baseDirFromPrefix(o.Prefix)
if o.singleObject {
// Override for single object.
o.BaseDir = o.Prefix
}
var cache metacache var cache metacache
// If we don't have a list id we must ask the server if it has a cache or create a new. // If we don't have a list id we must ask the server if it has a cache or create a new.

@ -87,6 +87,9 @@ type listPathOptions struct {
// This means the cache metadata will not be persisted on disk. // This means the cache metadata will not be persisted on disk.
// A transient result will never be returned from the cache so knowing the list id is required. // A transient result will never be returned from the cache so knowing the list id is required.
Transient bool Transient bool
// singleObject will assume that prefix refers to an exact single object.
singleObject bool
} }
func init() { func init() {
@ -542,7 +545,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
console.Printf("listPath with options: %#v\n", o) console.Printf("listPath with options: %#v\n", o)
} }
// See if we have the listing stored. // See if we have the listing stored.
if !o.Create { if !o.Create && !o.singleObject {
entries, err := er.streamMetadataParts(ctx, o) entries, err := er.streamMetadataParts(ctx, o)
switch err { switch err {
case nil, io.EOF, context.Canceled, context.DeadlineExceeded: case nil, io.EOF, context.Canceled, context.DeadlineExceeded:
@ -662,6 +665,10 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
// Write results to disk. // Write results to disk.
bw := newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error { bw := newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error {
if o.singleObject {
// Don't save single object listings.
return nil
}
if debugPrint { if debugPrint {
console.Println("listPath: saving block", b.n, "to", o.objectPath(b.n)) console.Println("listPath: saving block", b.n, "to", o.objectPath(b.n))
} }

@ -116,13 +116,14 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// If root was an object return it as such. // If root was an object return it as such.
if HasSuffix(entry, xlStorageFormatFile) { if HasSuffix(entry, xlStorageFormatFile) {
var meta metaCacheEntry var meta metaCacheEntry
meta.metadata, err = ioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFile)) meta.metadata, err = ioutil.ReadFile(pathJoin(volumeDir, current, entry))
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
} }
meta.name = strings.TrimSuffix(meta.name, xlStorageFormatFile) meta.name = strings.TrimSuffix(entry, xlStorageFormatFile)
meta.name = strings.TrimSuffix(meta.name, SlashSeparator) meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
meta.name = pathJoin(current, meta.name)
meta.name = decodeDirObject(meta.name) meta.name = decodeDirObject(meta.name)
out <- meta out <- meta
return nil return nil
@ -130,13 +131,14 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Check legacy. // Check legacy.
if HasSuffix(entry, xlStorageFormatFileV1) { if HasSuffix(entry, xlStorageFormatFileV1) {
var meta metaCacheEntry var meta metaCacheEntry
meta.metadata, err = ioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1)) meta.metadata, err = ioutil.ReadFile(pathJoin(volumeDir, current, entry))
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
} }
meta.name = strings.TrimSuffix(meta.name, xlStorageFormatFileV1) meta.name = strings.TrimSuffix(entry, xlStorageFormatFileV1)
meta.name = strings.TrimSuffix(meta.name, SlashSeparator) meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
meta.name = pathJoin(current, meta.name)
out <- meta out <- meta
return nil return nil
} }

Loading…
Cancel
Save