You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
minio/cmd/xl-zones.go

1597 lines
44 KiB

/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"strings"
"sync"
"time"
"github.com/minio/minio-go/v6/pkg/tags"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/sync/errgroup"
)
type xlZones struct {
GatewayUnsupported
zones []*xlSets
}
func (z *xlZones) SingleZone() bool {
return len(z.zones) == 1
}
// Initialize new zone of erasure sets.
func newXLZones(ctx context.Context, endpointZones EndpointZones) (ObjectLayer, error) {
var (
deploymentID string
err error
formats = make([]*formatXLV3, len(endpointZones))
storageDisks = make([][]StorageAPI, len(endpointZones))
z = &xlZones{zones: make([]*xlSets, len(endpointZones))}
)
var localDrives []string
local := endpointZones.FirstLocal()
for i, ep := range endpointZones {
for _, endpoint := range ep.Endpoints {
if endpoint.IsLocal {
localDrives = append(localDrives, endpoint.Path)
}
}
storageDisks[i], formats[i], err = waitForFormatXL(local, ep.Endpoints, i+1,
ep.SetCount, ep.DrivesPerSet, deploymentID)
if err != nil {
return nil, err
}
if deploymentID == "" {
deploymentID = formats[i].ID
}
z.zones[i], err = newXLSets(ctx, ep.Endpoints, storageDisks[i], formats[i])
if err != nil {
return nil, err
}
}
go intDataUpdateTracker.start(GlobalContext, localDrives...)
return z, nil
}
func (z *xlZones) NewNSLock(ctx context.Context, bucket string, objects ...string) RWLocker {
return z.zones[0].NewNSLock(ctx, bucket, objects...)
}
type zonesAvailableSpace []zoneAvailableSpace
type zoneAvailableSpace struct {
Index int
Available uint64
}
// TotalAvailable - total available space
func (p zonesAvailableSpace) TotalAvailable() uint64 {
total := uint64(0)
for _, z := range p {
total += z.Available
}
return total
}
func (z *xlZones) getAvailableZoneIdx(ctx context.Context) int {
zones := z.getZonesAvailableSpace(ctx)
total := zones.TotalAvailable()
if total == 0 {
// Houston, we have a problem, maybe panic??
return zones[0].Index
}
// choose when we reach this many
choose := rand.Uint64() % total
atTotal := uint64(0)
for _, zone := range zones {
atTotal += zone.Available
if atTotal > choose && zone.Available > 0 {
return zone.Index
}
}
// Should not happen, but print values just in case.
panic(fmt.Errorf("reached end of zones (total: %v, atTotal: %v, choose: %v)", total, atTotal, choose))
}
func (z *xlZones) getZonesAvailableSpace(ctx context.Context) zonesAvailableSpace {
var zones = make(zonesAvailableSpace, len(z.zones))
storageInfos := make([]StorageInfo, len(z.zones))
g := errgroup.WithNErrs(len(z.zones))
for index := range z.zones {
index := index
g.Go(func() error {
storageInfos[index] = z.zones[index].StorageInfo(ctx, false)
return nil
}, index)
}
// Wait for the go routines.
g.Wait()
for i, zinfo := range storageInfos {
var available uint64
for _, davailable := range zinfo.Available {
available += davailable
}
zones[i] = zoneAvailableSpace{
Index: i,
Available: available,
}
}
return zones
}
func (z *xlZones) Shutdown(ctx context.Context) error {
if z.SingleZone() {
return z.zones[0].Shutdown(ctx)
}
g := errgroup.WithNErrs(len(z.zones))
for index := range z.zones {
index := index
g.Go(func() error {
return z.zones[index].Shutdown(ctx)
}, index)
}
for _, err := range g.Wait() {
if err != nil {
logger.LogIf(ctx, err)
}
// let's the rest shutdown
}
return nil
}
func (z *xlZones) StorageInfo(ctx context.Context, local bool) StorageInfo {
if z.SingleZone() {
return z.zones[0].StorageInfo(ctx, local)
}
var storageInfo StorageInfo
storageInfos := make([]StorageInfo, len(z.zones))
g := errgroup.WithNErrs(len(z.zones))
for index := range z.zones {
index := index
g.Go(func() error {
storageInfos[index] = z.zones[index].StorageInfo(ctx, local)
return nil
}, index)
}
// Wait for the go routines.
g.Wait()
for _, lstorageInfo := range storageInfos {
storageInfo.Used = append(storageInfo.Used, lstorageInfo.Used...)
storageInfo.Total = append(storageInfo.Total, lstorageInfo.Total...)
storageInfo.Available = append(storageInfo.Available, lstorageInfo.Available...)
storageInfo.MountPaths = append(storageInfo.MountPaths, lstorageInfo.MountPaths...)
storageInfo.Backend.OnlineDisks = storageInfo.Backend.OnlineDisks.Merge(lstorageInfo.Backend.OnlineDisks)
storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks.Merge(lstorageInfo.Backend.OfflineDisks)
storageInfo.Backend.Sets = append(storageInfo.Backend.Sets, lstorageInfo.Backend.Sets...)
}
storageInfo.Backend.Type = storageInfos[0].Backend.Type
storageInfo.Backend.StandardSCData = storageInfos[0].Backend.StandardSCData
storageInfo.Backend.StandardSCParity = storageInfos[0].Backend.StandardSCParity
storageInfo.Backend.RRSCData = storageInfos[0].Backend.RRSCData
storageInfo.Backend.RRSCParity = storageInfos[0].Backend.RRSCParity
return storageInfo
}
func (z *xlZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
var mu sync.Mutex
var results []dataUsageCache
var firstErr error
var knownBuckets = make(map[string]struct{}) // used to deduplicate buckets.
var allBuckets []BucketInfo
// Collect for each set in zones.
for _, z := range z.zones {
for _, xlObj := range z.sets {
// Add new buckets.
buckets, err := xlObj.ListBuckets(ctx)
if err != nil {
return err
}
for _, b := range buckets {
if _, ok := knownBuckets[b.Name]; ok {
continue
}
allBuckets = append(allBuckets, b)
knownBuckets[b.Name] = struct{}{}
}
wg.Add(1)
results = append(results, dataUsageCache{})
go func(i int, xl *xlObjects) {
updates := make(chan dataUsageCache, 1)
defer close(updates)
// Start update collector.
go func() {
defer wg.Done()
for info := range updates {
mu.Lock()
results[i] = info
mu.Unlock()
}
}()
// Start crawler. Blocks until done.
err := xl.crawlAndGetDataUsage(ctx, buckets, bf, updates)
if err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
}
// Cancel remaining...
cancel()
mu.Unlock()
return
}
}(len(results)-1, xlObj)
}
}
updateCloser := make(chan chan struct{})
go func() {
updateTicker := time.NewTicker(30 * time.Second)
defer updateTicker.Stop()
var lastUpdate time.Time
update := func() {
mu.Lock()
defer mu.Unlock()
// We need to merge since we will get the same buckets from each zone.
// Therefore to get the exact bucket sizes we must merge before we can convert.
allMerged := dataUsageCache{Info: dataUsageCacheInfo{Name: dataUsageRoot}}
for _, info := range results {
if info.Info.LastUpdate.IsZero() {
// Not filled yet.
return
}
allMerged.merge(info)
}
if allMerged.root() != nil && allMerged.Info.LastUpdate.After(lastUpdate) {
updates <- allMerged.dui(allMerged.Info.Name, allBuckets)
lastUpdate = allMerged.Info.LastUpdate
}
}
for {
select {
case <-ctx.Done():
return
case v := <-updateCloser:
update()
close(v)
return
case <-updateTicker.C:
update()
}
}
}()
wg.Wait()
ch := make(chan struct{})
updateCloser <- ch
<-ch
return firstErr
}
// MakeBucketWithLocation - creates a new bucket across all zones simultaneously
// even if one of the sets fail to create buckets, we proceed all the successful
// operations.
func (z *xlZones) MakeBucketWithLocation(ctx context.Context, bucket, location string, lockEnabled bool) error {
if z.SingleZone() {
if err := z.zones[0].MakeBucketWithLocation(ctx, bucket, location, lockEnabled); err != nil {
return err
}
// If it doesn't exist we get a new, so ignore errors
meta, _ := globalBucketMetadataSys.Get(bucket)
meta.ObjectLockConfigurationXML = defaultBucketObjectLockConfig
if err := meta.Save(ctx, z); err != nil {
return toObjectErr(err, bucket)
}
globalBucketMetadataSys.Set(bucket, meta)
return nil
}
g := errgroup.WithNErrs(len(z.zones))
// Create buckets in parallel across all sets.
for index := range z.zones {
index := index
g.Go(func() error {
return z.zones[index].MakeBucketWithLocation(ctx, bucket, location, lockEnabled)
}, index)
}
errs := g.Wait()
// Return the first encountered error
for _, err := range errs {
if err != nil {
return err
}
}
// If it doesn't exist we get a new, so ignore errors
meta, _ := globalBucketMetadataSys.Get(bucket)
meta.ObjectLockConfigurationXML = defaultBucketObjectLockConfig
if err := meta.Save(ctx, z); err != nil {
return toObjectErr(err, bucket)
}
globalBucketMetadataSys.Set(bucket, meta)
// Success.
return nil
}
func (z *xlZones) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
var nsUnlocker = func() {}
// Acquire lock
if lockType != noLock {
lock := z.NewNSLock(ctx, bucket, object)
switch lockType {
case writeLock:
if err = lock.GetLock(globalObjectTimeout); err != nil {
return nil, err
}
nsUnlocker = lock.Unlock
case readLock:
if err = lock.GetRLock(globalObjectTimeout); err != nil {
return nil, err
}
nsUnlocker = lock.RUnlock
}
}
for _, zone := range z.zones {
gr, err := zone.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
if err != nil {
if isErrObjectNotFound(err) {
continue
}
nsUnlocker()
return nil, err
}
gr.cleanUpFns = append(gr.cleanUpFns, nsUnlocker)
return gr, nil
}
nsUnlocker()
return nil, ObjectNotFound{Bucket: bucket, Object: object}
}
func (z *xlZones) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
// Lock the object before reading.
lk := z.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalObjectTimeout); err != nil {
return err
}
defer lk.RUnlock()
if z.SingleZone() {
return z.zones[0].GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
for _, zone := range z.zones {
if err := zone.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts); err != nil {
if isErrObjectNotFound(err) {
continue
}
return err
}
return nil
}
return ObjectNotFound{Bucket: bucket, Object: object}
}
func (z *xlZones) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
// Lock the object before reading.
lk := z.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalObjectTimeout); err != nil {
return ObjectInfo{}, err
}
defer lk.RUnlock()
if z.SingleZone() {
return z.zones[0].GetObjectInfo(ctx, bucket, object, opts)
}
for _, zone := range z.zones {
objInfo, err := zone.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
if isErrObjectNotFound(err) {
continue
}
return objInfo, err
}
return objInfo, nil
}
return ObjectInfo{}, ObjectNotFound{Bucket: bucket, Object: object}
}
// PutObject - writes an object to least used erasure zone.
func (z *xlZones) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (ObjectInfo, error) {
// Lock the object.
lk := z.NewNSLock(ctx, bucket, object)
if err := lk.GetLock(globalObjectTimeout); err != nil {
return ObjectInfo{}, err
}
defer lk.Unlock()
if z.SingleZone() {
return z.zones[0].PutObject(ctx, bucket, object, data, opts)
}
for _, zone := range z.zones {
objInfo, err := zone.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
if isErrObjectNotFound(err) {
continue
}
return objInfo, err
}
// Overwrite request upload to right zone.
return zone.PutObject(ctx, bucket, object, data, opts)
}
// Object not found pick the least used and upload to this zone.
return z.zones[z.getAvailableZoneIdx(ctx)].PutObject(ctx, bucket, object, data, opts)
}
func (z *xlZones) DeleteObject(ctx context.Context, bucket string, object string) error {
// Acquire a write lock before deleting the object.
lk := z.NewNSLock(ctx, bucket, object)
if err := lk.GetLock(globalOperationTimeout); err != nil {
return err
}
defer lk.Unlock()
if z.SingleZone() {
return z.zones[0].DeleteObject(ctx, bucket, object)
}
for _, zone := range z.zones {
err := zone.DeleteObject(ctx, bucket, object)
if err != nil && !isErrObjectNotFound(err) {
return err
}
}
return nil
}
func (z *xlZones) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
derrs := make([]error, len(objects))
for i := range derrs {
derrs[i] = checkDelObjArgs(ctx, bucket, objects[i])
}
// Acquire a bulk write lock across 'objects'
multiDeleteLock := z.NewNSLock(ctx, bucket, objects...)
if err := multiDeleteLock.GetLock(globalOperationTimeout); err != nil {
return nil, err
}
defer multiDeleteLock.Unlock()
for _, zone := range z.zones {
errs, err := zone.DeleteObjects(ctx, bucket, objects)
if err != nil {
return nil, err
}
for i, derr := range errs {
if derrs[i] == nil {
if derr != nil && !isErrObjectNotFound(derr) {
derrs[i] = derr
}
}
}
}
return derrs, nil
}
func (z *xlZones) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
// Check if this request is only metadata update.
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(destBucket, destObject))
if !cpSrcDstSame {
lk := z.NewNSLock(ctx, destBucket, destObject)
if err := lk.GetLock(globalObjectTimeout); err != nil {
return objInfo, err
}
defer lk.Unlock()
}
if z.SingleZone() {
return z.zones[0].CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
}
if cpSrcDstSame && srcInfo.metadataOnly {
for _, zone := range z.zones {
objInfo, err = zone.CopyObject(ctx, srcBucket, srcObject, destBucket,
destObject, srcInfo, srcOpts, dstOpts)
if err != nil {
if isErrObjectNotFound(err) {
continue
}
return objInfo, err
}
return objInfo, nil
}
return objInfo, ObjectNotFound{Bucket: srcBucket, Object: srcObject}
}
return z.zones[z.getAvailableZoneIdx(ctx)].CopyObject(ctx, srcBucket, srcObject,
destBucket, destObject, srcInfo, srcOpts, dstOpts)
}
func (z *xlZones) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) {
if z.SingleZone() {
return z.zones[0].ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, maxKeys, fetchOwner, startAfter)
}
marker := continuationToken
if marker == "" {
marker = startAfter
}
loi, err := z.ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
return ListObjectsV2Info{}, err
}
listObjectsV2Info := ListObjectsV2Info{
IsTruncated: loi.IsTruncated,
ContinuationToken: continuationToken,
NextContinuationToken: loi.NextMarker,
Objects: loi.Objects,
Prefixes: loi.Prefixes,
}
return listObjectsV2Info, err
}
func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
var zonesEntryChs [][]FileInfoCh
endWalkCh := make(chan struct{})
defer close(endWalkCh)
const ndisks = 3
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, ndisks))
}
var objInfos []ObjectInfo
var eof bool
var prevPrefix string
var zonesEntriesInfos [][]FileInfo
var zonesEntriesValid [][]bool
for _, entryChs := range zonesEntryChs {
zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs)))
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
}
for {
if len(objInfos) == maxKeys {
break
}
result, quorumCount, _, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
eof = true
break
}
if quorumCount < ndisks-1 {
// Skip entries which are not found on upto ndisks.
continue
}
var objInfo ObjectInfo
index := strings.Index(strings.TrimPrefix(result.Name, prefix), delimiter)
if index == -1 {
objInfo = ObjectInfo{
IsDir: false,
Bucket: bucket,
Name: result.Name,
ModTime: result.ModTime,
Size: result.Size,
ContentType: result.Metadata["content-type"],
ContentEncoding: result.Metadata["content-encoding"],
}
// Extract etag from metadata.
objInfo.ETag = extractETag(result.Metadata)
// All the parts per object.
objInfo.Parts = result.Parts
// etag/md5Sum has already been extracted. We need to
// remove to avoid it from appearing as part of
// response headers. e.g, X-Minio-* or X-Amz-*.
objInfo.UserDefined = cleanMetadata(result.Metadata)
// Update storage class
if sc, ok := result.Metadata[xhttp.AmzStorageClass]; ok {
objInfo.StorageClass = sc
} else {
objInfo.StorageClass = globalMinioDefaultStorageClass
}
} else {
index = len(prefix) + index + len(delimiter)
currPrefix := result.Name[:index]
if currPrefix == prevPrefix {
continue
}
prevPrefix = currPrefix
objInfo = ObjectInfo{
Bucket: bucket,
Name: currPrefix,
IsDir: true,
}
}
if objInfo.Name <= marker {
continue
}
objInfos = append(objInfos, objInfo)
}
result := ListObjectsInfo{}
for _, objInfo := range objInfos {
if objInfo.IsDir {
result.Prefixes = append(result.Prefixes, objInfo.Name)
continue
}
result.Objects = append(result.Objects, objInfo)
}
if !eof {
result.IsTruncated = true
if len(objInfos) > 0 {
result.NextMarker = objInfos[len(objInfos)-1].Name
}
}
return result, nil
}
func (z *xlZones) listObjectsSplunk(ctx context.Context, bucket, prefix, marker string, maxKeys int) (loi ListObjectsInfo, err error) {
if strings.Contains(prefix, guidSplunk) {
logger.LogIf(ctx, NotImplemented{})
return loi, NotImplemented{}
}
recursive := true
var zonesEntryChs [][]FileInfoCh
var zonesEndWalkCh []chan struct{}
const ndisks = 3
for _, zone := range z.zones {
entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, ndisks)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, ndisks)
if len(entries.Files) == 0 {
return loi, nil
}
loi.IsTruncated = entries.IsTruncated
if loi.IsTruncated {
loi.NextMarker = entries.Files[len(entries.Files)-1].Name
}
for _, entry := range entries.Files {
objInfo := entry.ToObjectInfo()
splits := strings.Split(objInfo.Name, guidSplunk)
if len(splits) == 0 {
loi.Objects = append(loi.Objects, objInfo)
continue
}
loi.Prefixes = append(loi.Prefixes, splits[0]+guidSplunk)
}
if loi.IsTruncated {
for i, zone := range z.zones {
zone.poolSplunk.Set(listParams{bucket, recursive, loi.NextMarker, prefix}, zonesEntryChs[i],
zonesEndWalkCh[i])
}
}
return loi, nil
}
func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
loi := ListObjectsInfo{}
if err := checkListObjsArgs(ctx, bucket, prefix, marker, z); err != nil {
return loi, err
}
// Marker is set validate pre-condition.
if marker != "" {
// Marker not common with prefix is not implemented. Send an empty response
if !HasPrefix(marker, prefix) {
return loi, nil
}
}
// With max keys of zero we have reached eof, return right here.
if maxKeys == 0 {
return loi, nil
}
// For delimiter and prefix as '/' we do not list anything at all
// since according to s3 spec we stop at the 'delimiter'
// along // with the prefix. On a flat namespace with 'prefix'
// as '/' we don't have any entries, since all the keys are
// of form 'keyName/...'
if delimiter == SlashSeparator && prefix == SlashSeparator {
return loi, nil
}
// Over flowing count - reset to maxObjectList.
if maxKeys < 0 || maxKeys > maxObjectList {
maxKeys = maxObjectList
}
if delimiter != SlashSeparator && delimiter != "" {
if delimiter == guidSplunk {
return z.listObjectsSplunk(ctx, bucket, prefix, marker, maxKeys)
}
return z.listObjectsNonSlash(ctx, bucket, prefix, marker, delimiter, maxKeys)
}
// Default is recursive, if delimiter is set then list non recursive.
recursive := true
if delimiter == SlashSeparator {
recursive = false
}
var zonesEntryChs [][]FileInfoCh
var zonesEndWalkCh []chan struct{}
const ndisks = 3
for _, zone := range z.zones {
entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, ndisks)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, ndisks)
if len(entries.Files) == 0 {
return loi, nil
}
loi.IsTruncated = entries.IsTruncated
if loi.IsTruncated {
loi.NextMarker = entries.Files[len(entries.Files)-1].Name
}
for _, entry := range entries.Files {
objInfo := entry.ToObjectInfo()
if HasSuffix(objInfo.Name, SlashSeparator) && !recursive {
loi.Prefixes = append(loi.Prefixes, objInfo.Name)
continue
}
loi.Objects = append(loi.Objects, objInfo)
}
if loi.IsTruncated {
for i, zone := range z.zones {
zone.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix}, zonesEntryChs[i],
zonesEndWalkCh[i])
}
}
return loi, nil
}
// Calculate least entry across zones and across multiple FileInfo
// channels, returns the least common entry and the total number of times
// we found this entry. Additionally also returns a boolean
// to indicate if the caller needs to call this function
// again to list the next entry. It is callers responsibility
// if the caller wishes to list N entries to call leastEntry
// N times until this boolean is 'false'.
func leastEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) {
for i, entryChs := range zoneEntryChs {
for j := range entryChs {
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}
}
var isTruncated = false
for _, entriesValid := range zoneEntriesValid {
for _, valid := range entriesValid {
if !valid {
continue
}
isTruncated = true
break
}
if isTruncated {
break
}
}
var lentry FileInfo
var found bool
var zoneIndex = -1
for i, entriesValid := range zoneEntriesValid {
for j, valid := range entriesValid {
if !valid {
continue
}
if !found {
lentry = zoneEntries[i][j]
found = true
zoneIndex = i
continue
}
if zoneEntries[i][j].Name < lentry.Name {
lentry = zoneEntries[i][j]
zoneIndex = i
}
}
}
// We haven't been able to find any least entry,
// this would mean that we don't have valid entry.
if !found {
return lentry, 0, zoneIndex, isTruncated
}
leastEntryCount := 0
for i, entriesValid := range zoneEntriesValid {
for j, valid := range entriesValid {
if !valid {
continue
}
// Entries are duplicated across disks,
// we should simply skip such entries.
if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) {
leastEntryCount++
continue
}
// Push all entries which are lexically higher
// and will be returned later in Pop()
zoneEntryChs[i][j].Push(zoneEntries[i][j])
}
}
return lentry, leastEntryCount, zoneIndex, isTruncated
}
// mergeZonesEntriesCh - merges FileInfo channel to entries upto maxKeys.
func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int) (entries FilesInfo) {
var i = 0
var zonesEntriesInfos [][]FileInfo
var zonesEntriesValid [][]bool
for _, entryChs := range zonesEntryChs {
zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs)))
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
}
for {
fi, quorumCount, _, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
// We have reached EOF across all entryChs, break the loop.
break
}
if quorumCount < ndisks-1 {
// Skip entries which are not found on upto ndisks.
continue
}
entries.Files = append(entries.Files, fi)
i++
if i == maxKeys {
entries.IsTruncated = isTruncatedZones(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
break
}
}
return entries
}
func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) bool {
for i, entryChs := range zoneEntryChs {
for j := range entryChs {
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}
}
var isTruncated = false
for _, entriesValid := range zoneEntriesValid {
for _, valid := range entriesValid {
if !valid {
continue
}
isTruncated = true
break
}
if isTruncated {
break
}
}
for i, entryChs := range zoneEntryChs {
for j := range entryChs {
if zoneEntriesValid[i][j] {
zoneEntryChs[i][j].Push(zoneEntries[i][j])
}
}
}
return isTruncated
}
func (z *xlZones) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
if z.SingleZone() {
return z.zones[0].ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
}
return z.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
}
func (z *xlZones) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
if err := checkListMultipartArgs(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, z); err != nil {
return ListMultipartsInfo{}, err
}
if z.SingleZone() {
return z.zones[0].ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
}
var zoneResult = ListMultipartsInfo{}
zoneResult.MaxUploads = maxUploads
zoneResult.KeyMarker = keyMarker
zoneResult.Prefix = prefix
zoneResult.Delimiter = delimiter
for _, zone := range z.zones {
result, err := zone.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker,
delimiter, maxUploads)
if err != nil {
return result, err
}
zoneResult.Uploads = append(zoneResult.Uploads, result.Uploads...)
}
return zoneResult, nil
}
// Initiate a new multipart upload on a hashedSet based on object name.
func (z *xlZones) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) {
if err := checkNewMultipartArgs(ctx, bucket, object, z); err != nil {
return "", err
}
if z.SingleZone() {
return z.zones[0].NewMultipartUpload(ctx, bucket, object, opts)
}
return z.zones[z.getAvailableZoneIdx(ctx)].NewMultipartUpload(ctx, bucket, object, opts)
}
// Copies a part of an object from source hashedSet to destination hashedSet.
func (z *xlZones) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (PartInfo, error) {
if err := checkNewMultipartArgs(ctx, srcBucket, srcObject, z); err != nil {
return PartInfo{}, err
}
return z.PutObjectPart(ctx, destBucket, destObject, uploadID, partID,
NewPutObjReader(srcInfo.Reader, nil, nil), dstOpts)
}
// PutObjectPart - writes part of an object to hashedSet based on the object name.
func (z *xlZones) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (PartInfo, error) {
if err := checkPutObjectPartArgs(ctx, bucket, object, z); err != nil {
return PartInfo{}, err
}
uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
return PartInfo{}, err
}
defer uploadIDLock.Unlock()
if z.SingleZone() {
return z.zones[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
}
for _, zone := range z.zones {
result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxObjectList)
if err != nil {
return PartInfo{}, err
}
if result.Lookup(uploadID) {
return zone.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
}
}
return PartInfo{}, InvalidUploadID{
Bucket: bucket,
Object: object,
UploadID: uploadID,
}
}
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
func (z *xlZones) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (ListPartsInfo, error) {
if err := checkListPartsArgs(ctx, bucket, object, z); err != nil {
return ListPartsInfo{}, err
}
uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return ListPartsInfo{}, err
}
defer uploadIDLock.RUnlock()
if z.SingleZone() {
return z.zones[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
}
for _, zone := range z.zones {
result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxObjectList)
if err != nil {
return ListPartsInfo{}, err
}
if result.Lookup(uploadID) {
return zone.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
}
}
return ListPartsInfo{}, InvalidUploadID{
Bucket: bucket,
Object: object,
UploadID: uploadID,
}
}
// Aborts an in-progress multipart operation on hashedSet based on the object name.
func (z *xlZones) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
if err := checkAbortMultipartArgs(ctx, bucket, object, z); err != nil {
return err
}
uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer uploadIDLock.Unlock()
if z.SingleZone() {
return z.zones[0].AbortMultipartUpload(ctx, bucket, object, uploadID)
}
for _, zone := range z.zones {
result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxObjectList)
if err != nil {
return err
}
if result.Lookup(uploadID) {
return zone.AbortMultipartUpload(ctx, bucket, object, uploadID)
}
}
return InvalidUploadID{
Bucket: bucket,
Object: object,
UploadID: uploadID,
}
}
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
func (z *xlZones) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if err = checkCompleteMultipartArgs(ctx, bucket, object, z); err != nil {
return objInfo, err
}
// Hold read-locks to verify uploaded parts, also disallows
// parallel part uploads as well.
uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID))
if err = uploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return objInfo, err
}
defer uploadIDLock.RUnlock()
// Hold namespace to complete the transaction, only hold
// if uploadID can be held exclusively.
lk := z.NewNSLock(ctx, bucket, object)
if err = lk.GetLock(globalOperationTimeout); err != nil {
return objInfo, err
}
defer lk.Unlock()
if z.SingleZone() {
return z.zones[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
}
// Purge any existing object.
for _, zone := range z.zones {
zone.DeleteObject(ctx, bucket, object)
}
for _, zone := range z.zones {
result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxObjectList)
if err != nil {
return objInfo, err
}
if result.Lookup(uploadID) {
return zone.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
}
}
return objInfo, InvalidUploadID{
Bucket: bucket,
Object: object,
UploadID: uploadID,
}
}
// GetBucketInfo - returns bucket info from one of the erasure coded zones.
func (z *xlZones) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
if z.SingleZone() {
bucketInfo, err = z.zones[0].GetBucketInfo(ctx, bucket)
if err != nil {
return bucketInfo, err
}
meta, err := globalBucketMetadataSys.Get(bucket)
if err == nil {
bucketInfo.Created = meta.Created
}
return bucketInfo, nil
}
for _, zone := range z.zones {
bucketInfo, err = zone.GetBucketInfo(ctx, bucket)
if err != nil {
if isErrBucketNotFound(err) {
continue
}
return bucketInfo, err
}
meta, err := globalBucketMetadataSys.Get(bucket)
if err == nil {
bucketInfo.Created = meta.Created
}
return bucketInfo, nil
}
return bucketInfo, BucketNotFound{
Bucket: bucket,
}
}
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
func (z *xlZones) IsNotificationSupported() bool {
return true
}
// IsListenBucketSupported returns whether listen bucket notification is applicable for this layer.
func (z *xlZones) IsListenBucketSupported() bool {
return true
}
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
func (z *xlZones) IsEncryptionSupported() bool {
return true
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (z *xlZones) IsCompressionSupported() bool {
return true
}
// DeleteBucket - deletes a bucket on all zones simultaneously,
// even if one of the zones fail to delete buckets, we proceed to
// undo a successful operation.
func (z *xlZones) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
if z.SingleZone() {
return z.zones[0].DeleteBucket(ctx, bucket, forceDelete)
}
g := errgroup.WithNErrs(len(z.zones))
// Delete buckets in parallel across all zones.
for index := range z.zones {
index := index
g.Go(func() error {
return z.zones[index].DeleteBucket(ctx, bucket, forceDelete)
}, index)
}
errs := g.Wait()
// For any write quorum failure, we undo all the delete
// buckets operation by creating all the buckets again.
for _, err := range errs {
if err != nil {
if _, ok := err.(InsufficientWriteQuorum); ok {
undoDeleteBucketZones(bucket, z.zones, errs)
}
return err
}
}
// Success.
return nil
}
// This function is used to undo a successful DeleteBucket operation.
func undoDeleteBucketZones(bucket string, zones []*xlSets, errs []error) {
g := errgroup.WithNErrs(len(zones))
// Undo previous delete bucket on all underlying zones.
for index := range zones {
index := index
g.Go(func() error {
if errs[index] == nil {
return zones[index].MakeBucketWithLocation(GlobalContext, bucket, "", false)
}
return nil
}, index)
}
g.Wait()
}
// List all buckets from one of the zones, we are not doing merge
// sort here just for simplification. As per design it is assumed
// that all buckets are present on all zones.
func (z *xlZones) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
if z.SingleZone() {
buckets, err = z.zones[0].ListBuckets(ctx)
} else {
for _, zone := range z.zones {
buckets, err = zone.ListBuckets(ctx)
if err != nil {
logger.LogIf(ctx, err)
continue
}
break
}
}
if err != nil {
return nil, err
}
for i := range buckets {
meta, err := globalBucketMetadataSys.Get(buckets[i].Name)
if err == nil {
buckets[i].Created = meta.Created
}
globalBucketMetadataSys.Set(buckets[i].Name, meta)
}
return buckets, nil
}
func (z *xlZones) ReloadFormat(ctx context.Context, dryRun bool) error {
// Acquire lock on format.json
formatLock := z.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
if err := formatLock.GetRLock(globalHealingTimeout); err != nil {
return err
}
defer formatLock.RUnlock()
for _, zone := range z.zones {
if err := zone.ReloadFormat(ctx, dryRun); err != nil {
return err
}
}
return nil
}
func (z *xlZones) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
// Acquire lock on format.json
formatLock := z.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
if err := formatLock.GetLock(globalHealingTimeout); err != nil {
return madmin.HealResultItem{}, err
}
defer formatLock.Unlock()
var r = madmin.HealResultItem{
Type: madmin.HealItemMetadata,
Detail: "disk-format",
}
var countNoHeal int
for _, zone := range z.zones {
result, err := zone.HealFormat(ctx, dryRun)
if err != nil && err != errNoHealRequired {
logger.LogIf(ctx, err)
continue
}
// Count errNoHealRequired across all zones,
// to return appropriate error to the caller
if err == errNoHealRequired {
countNoHeal++
}
r.DiskCount += result.DiskCount
r.SetCount += result.SetCount
r.Before.Drives = append(r.Before.Drives, result.Before.Drives...)
r.After.Drives = append(r.After.Drives, result.After.Drives...)
}
// No heal returned by all zones, return errNoHealRequired
if countNoHeal == len(z.zones) {
return r, errNoHealRequired
}
return r, nil
}
func (z *xlZones) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) {
var r = madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
}
for _, zone := range z.zones {
result, err := zone.HealBucket(ctx, bucket, dryRun, remove)
if err != nil {
switch err.(type) {
case BucketNotFound:
continue
}
return result, err
}
r.DiskCount += result.DiskCount
r.SetCount += result.SetCount
r.Before.Drives = append(r.Before.Drives, result.Before.Drives...)
r.After.Drives = append(r.After.Drives, result.After.Drives...)
}
return r, nil
}
// Walk a bucket, optionally prefix recursively, until we have returned
// all the content to objectInfo channel, it is callers responsibility
// to allocate a receive channel for ObjectInfo, upon any unhandled
// error walker returns error. Optionally if context.Done() is received
// then Walk() stops the walker.
func (z *xlZones) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil {
// Upon error close the channel.
close(results)
return err
}
var zonesEntryChs [][]FileInfoCh
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done()))
}
var zoneDrivesPerSet []int
for _, zone := range z.zones {
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
}
var zonesEntriesInfos [][]FileInfo
var zonesEntriesValid [][]bool
for _, entryChs := range zonesEntryChs {
zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs)))
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
}
go func() {
defer close(results)
for {
entry, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs,
zonesEntriesInfos, zonesEntriesValid)
if !ok {
return
}
if quorumCount >= zoneDrivesPerSet[zoneIndex]/2 {
results <- entry.ToObjectInfo() // Read quorum exists proceed
}
// skip entries which do not have quorum
}
}()
return nil
}
type healObjectFn func(string, string) error
func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject healObjectFn) error {
var zonesEntryChs [][]FileInfoCh
endWalkCh := make(chan struct{})
defer close(endWalkCh)
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalks(ctx, bucket, prefix, "", true, endWalkCh))
}
var zoneDrivesPerSet []int
for _, zone := range z.zones {
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
}
var zonesEntriesInfos [][]FileInfo
var zonesEntriesValid [][]bool
for _, entryChs := range zonesEntryChs {
zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs)))
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
}
for {
entry, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
break
}
if quorumCount == zoneDrivesPerSet[zoneIndex] && opts.ScanMode == madmin.HealNormalScan {
// Skip good entries.
continue
}
// Wait and proceed if there are active requests
waitForLowHTTPReq(int32(zoneDrivesPerSet[zoneIndex]))
if err := healObject(bucket, entry.Name); err != nil {
return toObjectErr(err, bucket, entry.Name)
}
}
return nil
}
func (z *xlZones) HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Lock the object before healing. Use read lock since healing
// will only regenerate parts & xl.json of outdated disks.
lk := z.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalHealingTimeout); err != nil {
return madmin.HealResultItem{}, err
}
defer lk.RUnlock()
if z.SingleZone() {
return z.zones[0].HealObject(ctx, bucket, object, opts)
}
for _, zone := range z.zones {
result, err := zone.HealObject(ctx, bucket, object, opts)
if err != nil {
if isErrObjectNotFound(err) {
continue
}
return result, err
}
return result, nil
}
return madmin.HealResultItem{}, ObjectNotFound{
Bucket: bucket,
Object: object,
}
}
func (z *xlZones) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
var healBuckets []BucketInfo
for _, zone := range z.zones {
bucketsInfo, err := zone.ListBucketsHeal(ctx)
if err != nil {
continue
}
healBuckets = append(healBuckets, bucketsInfo...)
}
for i := range healBuckets {
meta, err := globalBucketMetadataSys.Get(healBuckets[i].Name)
if err == nil {
healBuckets[i].Created = meta.Created
}
globalBucketMetadataSys.Set(healBuckets[i].Name, meta)
}
return healBuckets, nil
}
// GetMetrics - no op
func (z *xlZones) GetMetrics(ctx context.Context) (*Metrics, error) {
logger.LogIf(ctx, NotImplemented{})
return &Metrics{}, NotImplemented{}
}
// IsReady - Returns true if first zone returns true
func (z *xlZones) IsReady(ctx context.Context) bool {
return z.zones[0].IsReady(ctx)
}
// PutObjectTag - replace or add tags to an existing object
func (z *xlZones) PutObjectTag(ctx context.Context, bucket, object string, tags string) error {
if z.SingleZone() {
return z.zones[0].PutObjectTag(ctx, bucket, object, tags)
}
for _, zone := range z.zones {
err := zone.PutObjectTag(ctx, bucket, object, tags)
if err != nil {
if isErrBucketNotFound(err) {
continue
}
return err
}
return nil
}
return BucketNotFound{
Bucket: bucket,
}
}
// DeleteObjectTag - delete object tags from an existing object
func (z *xlZones) DeleteObjectTag(ctx context.Context, bucket, object string) error {
if z.SingleZone() {
return z.zones[0].DeleteObjectTag(ctx, bucket, object)
}
for _, zone := range z.zones {
err := zone.DeleteObjectTag(ctx, bucket, object)
if err != nil {
if isErrBucketNotFound(err) {
continue
}
return err
}
return nil
}
return BucketNotFound{
Bucket: bucket,
}
}
// GetObjectTag - get object tags from an existing object
func (z *xlZones) GetObjectTag(ctx context.Context, bucket, object string) (*tags.Tags, error) {
if z.SingleZone() {
return z.zones[0].GetObjectTag(ctx, bucket, object)
}
for _, zone := range z.zones {
tags, err := zone.GetObjectTag(ctx, bucket, object)
if err != nil {
if isErrBucketNotFound(err) {
continue
}
return tags, err
}
return tags, nil
}
return nil, BucketNotFound{
Bucket: bucket,
}
}