|
|
|
@ -30,6 +30,7 @@ import ( |
|
|
|
|
"github.com/minio/minio/pkg/bucket/policy" |
|
|
|
|
"github.com/minio/minio/pkg/event" |
|
|
|
|
"github.com/minio/minio/pkg/madmin" |
|
|
|
|
"github.com/minio/minio/pkg/sync/errgroup" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// BucketMetadataSys captures all bucket metadata for a given cluster.
|
|
|
|
@ -291,21 +292,44 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, ob |
|
|
|
|
return sys.load(ctx, buckets, objAPI) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Loads bucket metadata for all buckets into BucketMetadataSys.
|
|
|
|
|
func (sys *BucketMetadataSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { |
|
|
|
|
sys.Lock() |
|
|
|
|
defer sys.Unlock() |
|
|
|
|
|
|
|
|
|
for _, bucket := range buckets { |
|
|
|
|
meta, err := loadBucketMetadata(ctx, objAPI, bucket.Name) |
|
|
|
|
// concurrently load bucket metadata to speed up loading bucket metadata.
|
|
|
|
|
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { |
|
|
|
|
g := errgroup.WithNErrs(len(buckets)) |
|
|
|
|
for index := range buckets { |
|
|
|
|
index := index |
|
|
|
|
g.Go(func() error { |
|
|
|
|
meta, err := loadBucketMetadata(ctx, objAPI, buckets[index].Name) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
sys.Lock() |
|
|
|
|
sys.metadataMap[buckets[index].Name] = meta |
|
|
|
|
sys.Unlock() |
|
|
|
|
return nil |
|
|
|
|
}, index) |
|
|
|
|
} |
|
|
|
|
for _, err := range g.Wait() { |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
sys.metadataMap[bucket.Name] = meta |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Loads bucket metadata for all buckets into BucketMetadataSys.
|
|
|
|
|
func (sys *BucketMetadataSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { |
|
|
|
|
count := 100 // load 100 bucket metadata at a time.
|
|
|
|
|
for { |
|
|
|
|
if len(buckets) < count { |
|
|
|
|
return sys.concurrentLoad(ctx, buckets, objAPI) |
|
|
|
|
} |
|
|
|
|
if err := sys.concurrentLoad(ctx, buckets[:count], objAPI); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
buckets = buckets[count:] |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewBucketMetadataSys - creates new policy system.
|
|
|
|
|
func NewBucketMetadataSys() *BucketMetadataSys { |
|
|
|
|
return &BucketMetadataSys{ |
|
|
|
|