From 9c90a28546aac509a689f8001f415caac5e7a0fc Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Mon, 13 May 2019 20:25:49 +0100 Subject: [PATCH] Implement bulk delete (#7607) Bulk delete at storage level in Multiple Delete Objects API In order to accelerate bulk delete in Multiple Delete objects API, a new bulk delete is introduced in storage layer, which will accept a list of objects to delete rather than only one. Consequently, a new API is also need to be added to Object API. --- cmd/bucket-handlers.go | 33 ++++- cmd/bucket-handlers_test.go | 2 +- cmd/disk-cache.go | 18 +++ cmd/fs-v1.go | 10 ++ cmd/gateway/azure/gateway-azure.go | 8 ++ cmd/gateway/b2/gateway-b2.go | 8 ++ cmd/gateway/gcs/gateway-gcs.go | 8 ++ cmd/gateway/hdfs/gateway-hdfs.go | 8 ++ cmd/gateway/oss/gateway-oss.go | 8 ++ cmd/gateway/s3/gateway-s3.go | 8 ++ cmd/naughty-disk_test.go | 8 ++ cmd/object-api-common.go | 72 +++++++++++ cmd/object-api-interface.go | 1 + cmd/posix.go | 8 ++ cmd/storage-interface.go | 1 + cmd/storage-rest-client.go | 28 +++++ cmd/storage-rest-common.go | 1 + cmd/storage-rest-server.go | 37 ++++++ cmd/test-utils_test.go | 4 + cmd/utils.go | 7 ++ cmd/xl-sets.go | 59 ++++++++- cmd/xl-v1-object.go | 193 +++++++++++++++++++++++++++++ cmd/xl-v1-object_test.go | 74 +++++++++++ 23 files changed, 597 insertions(+), 7 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 74c6c0b7d..84e43b54f 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -309,12 +309,19 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, return } - deleteObject := objectAPI.DeleteObject + deleteObjectsFn := objectAPI.DeleteObjects if api.CacheAPI() != nil { - deleteObject = api.CacheAPI().DeleteObject + deleteObjectsFn = api.CacheAPI().DeleteObjects } + type delObj struct { + origIndex int + name string + } + + var objectsToDelete []delObj var dErrs = make([]APIErrorCode, len(deleteObjects.Objects)) + for index, object := range deleteObjects.Objects { if dErrs[index] = checkRequestAuthType(ctx, r, policy.DeleteObjectAction, bucket, object.ObjectName); dErrs[index] != ErrNone { if dErrs[index] == ErrSignatureDoesNotMatch || dErrs[index] == ErrInvalidAccessKeyID { @@ -323,10 +330,26 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } continue } - err := deleteObject(ctx, bucket, object.ObjectName) - if err != nil { - dErrs[index] = toAPIErrorCode(ctx, err) + + objectsToDelete = append(objectsToDelete, delObj{index, object.ObjectName}) + } + + toNames := func(input []delObj) (output []string) { + output = make([]string, len(input)) + for i := range input { + output[i] = input[i].name } + return + } + + errs, err := deleteObjectsFn(ctx, bucket, toNames(objectsToDelete)) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + for i, obj := range objectsToDelete { + dErrs[obj.origIndex] = toAPIErrorCode(ctx, errs[i]) } // Collect deleted objects and errors if any. diff --git a/cmd/bucket-handlers_test.go b/cmd/bucket-handlers_test.go index afff923cd..78ae30dee 100644 --- a/cmd/bucket-handlers_test.go +++ b/cmd/bucket-handlers_test.go @@ -762,7 +762,7 @@ func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketNa apiRouter.ServeHTTP(rec, req) // Assert the response code with the expected status. if rec.Code != testCase.expectedRespStatus { - t.Errorf("Case %d: MinIO %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, testCase.expectedRespStatus, rec.Code) + t.Errorf("Test %d: MinIO %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, testCase.expectedRespStatus, rec.Code) } // read the response body. diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 27faf9e44..ed84bf6cb 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -62,6 +62,7 @@ type cacheObjects struct { GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) DeleteObjectFn func(ctx context.Context, bucket, object string) error + DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error) ListObjectsFn func(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) ListObjectsV2Fn func(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) ListBucketsFn func(ctx context.Context) (buckets []BucketInfo, err error) @@ -94,6 +95,7 @@ type CacheObjectLayer interface { GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) DeleteObject(ctx context.Context, bucket, object string) error + DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) // Multipart operations. NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) @@ -629,6 +631,14 @@ func (c cacheObjects) DeleteObject(ctx context.Context, bucket, object string) ( return } +func (c cacheObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + for idx, object := range objects { + errs[idx] = c.DeleteObject(ctx, bucket, object) + } + return errs, nil +} + // Returns true if object should be excluded from cache func (c cacheObjects) isCacheExclude(bucket, object string) bool { for _, pattern := range c.exclude { @@ -974,6 +984,14 @@ func newServerCacheObjects(config CacheConfig) (CacheObjectLayer, error) { DeleteObjectFn: func(ctx context.Context, bucket, object string) error { return newObjectLayerFn().DeleteObject(ctx, bucket, object) }, + DeleteObjectsFn: func(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + for idx, object := range objects { + errs[idx] = newObjectLayerFn().DeleteObject(ctx, bucket, object) + } + return errs, nil + }, + ListObjectsFn: func(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) { return newObjectLayerFn().ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys) }, diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index e24af39c5..9fc79f8cb 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -953,6 +953,16 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string return fsMeta.ToObjectInfo(bucket, object, fi), nil } +// DeleteObjects - deletes an object from a bucket, this operation is destructive +// and there are no rollbacks supported. +func (fs *FSObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + for idx, object := range objects { + errs[idx] = fs.DeleteObject(ctx, bucket, object) + } + return errs, nil +} + // DeleteObject - deletes an object from a bucket, this operation is destructive // and there are no rollbacks supported. func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) error { diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index 1404a035d..0df4d3621 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -896,6 +896,14 @@ func (a *azureObjects) DeleteObject(ctx context.Context, bucket, object string) return nil } +func (a *azureObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + for idx, object := range objects { + errs[idx] = a.DeleteObject(ctx, bucket, object) + } + return errs, nil +} + // ListMultipartUploads - It's decided not to support List Multipart Uploads, hence returning empty result. func (a *azureObjects) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result minio.ListMultipartsInfo, err error) { // It's decided not to support List Multipart Uploads, hence returning empty result. diff --git a/cmd/gateway/b2/gateway-b2.go b/cmd/gateway/b2/gateway-b2.go index 3a944398a..50bf4ddf4 100644 --- a/cmd/gateway/b2/gateway-b2.go +++ b/cmd/gateway/b2/gateway-b2.go @@ -595,6 +595,14 @@ func (l *b2Objects) DeleteObject(ctx context.Context, bucket string, object stri return b2ToObjectError(err, bucket, object) } +func (l *b2Objects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + for idx, object := range objects { + errs[idx] = l.DeleteObject(ctx, bucket, object) + } + return errs, nil +} + // ListMultipartUploads lists all multipart uploads. func (l *b2Objects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) { diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index 5aa0d6c09..8d1c2cde3 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -955,6 +955,14 @@ func (l *gcsGateway) DeleteObject(ctx context.Context, bucket string, object str return nil } +func (l *gcsGateway) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + for idx, object := range objects { + errs[idx] = l.DeleteObject(ctx, bucket, object) + } + return errs, nil +} + // NewMultipartUpload - upload object in multiple parts func (l *gcsGateway) NewMultipartUpload(ctx context.Context, bucket string, key string, o minio.ObjectOptions) (uploadID string, err error) { // generate new uploadid diff --git a/cmd/gateway/hdfs/gateway-hdfs.go b/cmd/gateway/hdfs/gateway-hdfs.go index 60535772c..9473dfaff 100644 --- a/cmd/gateway/hdfs/gateway-hdfs.go +++ b/cmd/gateway/hdfs/gateway-hdfs.go @@ -400,6 +400,14 @@ func (n *hdfsObjects) DeleteObject(ctx context.Context, bucket, object string) e return hdfsToObjectErr(ctx, n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), minio.PathJoin(hdfsSeparator, bucket, object)), bucket, object) } +func (n *hdfsObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + for idx, object := range objects { + errs[idx] = n.DeleteObject(ctx, bucket, object) + } + return errs, nil +} + func (n *hdfsObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) { objInfo, err := n.GetObjectInfo(ctx, bucket, object, opts) if err != nil { diff --git a/cmd/gateway/oss/gateway-oss.go b/cmd/gateway/oss/gateway-oss.go index c3195f81a..bb63f42ff 100644 --- a/cmd/gateway/oss/gateway-oss.go +++ b/cmd/gateway/oss/gateway-oss.go @@ -710,6 +710,14 @@ func (l *ossObjects) DeleteObject(ctx context.Context, bucket, object string) er return nil } +func (l *ossObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + for idx, object := range objects { + errs[idx] = l.DeleteObject(ctx, bucket, object) + } + return errs, nil +} + // fromOSSClientListMultipartsInfo converts oss ListMultipartUploadResult to ListMultipartsInfo func fromOSSClientListMultipartsInfo(lmur oss.ListMultipartUploadResult) minio.ListMultipartsInfo { uploads := make([]minio.MultipartInfo, len(lmur.Uploads)) diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index c49cf550c..0a38ac69a 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -500,6 +500,14 @@ func (l *s3Objects) DeleteObject(ctx context.Context, bucket string, object stri return nil } +func (l *s3Objects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + for idx, object := range objects { + errs[idx] = l.DeleteObject(ctx, bucket, object) + } + return errs, nil +} + // ListMultipartUploads lists all multipart uploads. func (l *s3Objects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, e error) { result, err := l.Client.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index a5db9ed70..6b20eba15 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -167,6 +167,14 @@ func (d *naughtyDisk) DeleteFile(volume string, path string) (err error) { return d.disk.DeleteFile(volume, path) } +func (d *naughtyDisk) DeleteFileBulk(volume string, paths []string) ([]error, error) { + errs := make([]error, len(paths)) + for idx, path := range paths { + errs[idx] = d.disk.DeleteFile(volume, path) + } + return errs, nil +} + func (d *naughtyDisk) WriteAll(volume string, path string, buf []byte) (err error) { if err := d.calcError(); err != nil { return err diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index ec3be7361..ceb80bb7b 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -144,6 +144,78 @@ func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string) return err } +// Cleanup objects in bulk and recursively: each object will have a list of sub-files to delete in the backend +func cleanupObjectsBulk(ctx context.Context, storage StorageAPI, volume string, objsPaths []string, errs []error) ([]error, error) { + // The list of files in disk to delete + var filesToDelete []string + // Map files to delete to the passed objsPaths + var filesToDeleteObjsIndexes []int + + // Traverse and return the list of sub entries + var traverse func(string) ([]string, error) + traverse = func(entryPath string) ([]string, error) { + var output = make([]string, 0) + if !hasSuffix(entryPath, slashSeparator) { + output = append(output, entryPath) + return output, nil + } + entries, err := storage.ListDir(volume, entryPath, -1, "") + if err != nil { + if err == errFileNotFound { + return nil, nil + } + return nil, err + } + + for _, entry := range entries { + subEntries, err := traverse(pathJoin(entryPath, entry)) + if err != nil { + return nil, err + } + output = append(output, subEntries...) + } + return output, nil + } + + // Find and collect the list of files to remove associated + // to the passed objects paths + for idx, objPath := range objsPaths { + if errs[idx] != nil { + continue + } + output, err := traverse(objPath) + if err != nil { + errs[idx] = err + continue + } else { + errs[idx] = nil + } + filesToDelete = append(filesToDelete, output...) + for i := 0; i < len(output); i++ { + filesToDeleteObjsIndexes = append(filesToDeleteObjsIndexes, idx) + } + } + + // Reverse the list so remove can succeed + reverseStringSlice(filesToDelete) + + dErrs, err := storage.DeleteFileBulk(volume, filesToDelete) + if err != nil { + return nil, err + } + + // Map files deletion errors to the correspondent objects + for i := range dErrs { + if dErrs[i] != nil { + if errs[filesToDeleteObjsIndexes[i]] != nil { + errs[filesToDeleteObjsIndexes[i]] = dErrs[i] + } + } + } + + return errs, nil +} + // Removes notification.xml for a given bucket, only used during DeleteBucket. func removeNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucket string) error { // Verify bucket is valid. diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 58dc1bced..6833f8f62 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -73,6 +73,7 @@ type ObjectLayer interface { PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) DeleteObject(ctx context.Context, bucket, object string) error + DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) // Multipart operations. ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) diff --git a/cmd/posix.go b/cmd/posix.go index 3bc837fe9..5ee157a2c 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -1388,6 +1388,14 @@ func (s *posix) DeleteFile(volume, path string) (err error) { return deleteFile(volumeDir, filePath) } +func (s *posix) DeleteFileBulk(volume string, paths []string) (errs []error, err error) { + errs = make([]error, len(paths)) + for idx, path := range paths { + errs[idx] = s.DeleteFile(volume, path) + } + return +} + // RenameFile - rename source path to destination path atomically. func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { defer func() { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 6c0cba8a7..55dc66362 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -47,6 +47,7 @@ type StorageAPI interface { RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error StatFile(volume string, path string) (file FileInfo, err error) DeleteFile(volume string, path string) (err error) + DeleteFileBulk(volume string, paths []string) (errs []error, err error) // Write all data, syncs the data to disk. WriteAll(volume string, path string, buf []byte) (err error) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index a5736ef59..48d6721ff 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -356,6 +356,34 @@ func (client *storageRESTClient) DeleteFile(volume, path string) error { return err } +// DeleteFileBulk - deletes files in bulk. +func (client *storageRESTClient) DeleteFileBulk(volume string, paths []string) (errs []error, err error) { + errs = make([]error, len(paths)) + values := make(url.Values) + values.Set(storageRESTVolume, volume) + for _, path := range paths { + values.Add(storageRESTFilePath, path) + } + respBody, err := client.call(storageRESTMethodDeleteFileBulk, values, nil, -1) + defer http.DrainBody(respBody) + + if err != nil { + return nil, err + } + + bulkErrs := bulkErrorsResponse{} + gob.NewDecoder(respBody).Decode(&bulkErrs) + if err != nil { + return nil, err + } + + for i, dErr := range bulkErrs.Errs { + errs[i] = toStorageErr(dErr) + } + + return errs, nil +} + // RenameFile - renames a file. func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { values := make(url.Values) diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 0b0770077..7e087f42b 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -35,6 +35,7 @@ const ( storageRESTMethodReadFileStream = "readfilestream" storageRESTMethodListDir = "listdir" storageRESTMethodDeleteFile = "deletefile" + storageRESTMethodDeleteFileBulk = "deletefilebulk" storageRESTMethodRenameFile = "renamefile" storageRESTMethodGetInstanceID = "getinstanceid" ) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index c26e8e308..702166ca8 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -47,6 +47,22 @@ func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) w.(http.Flusher).Flush() } +type bulkErrorsResponse struct { + Errs []error `json:"errors"` +} + +func (s *storageRESTServer) writeErrorsResponse(w http.ResponseWriter, errs []error) { + resp := bulkErrorsResponse{Errs: make([]error, len(errs))} + for idx, err := range errs { + if err == nil { + continue + } + resp.Errs[idx] = err + } + gob.NewEncoder(w).Encode(resp) + w.(http.Flusher).Flush() +} + // DefaultSkewTime - skew time is 15 minutes between minio peers. const DefaultSkewTime = 15 * time.Minute @@ -391,6 +407,24 @@ func (s *storageRESTServer) DeleteFileHandler(w http.ResponseWriter, r *http.Req } } +// DeleteFileBulkHandler - delete a file. +func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := r.URL.Query() + volume := vars.Get(storageRESTVolume) + filePaths := vars[storageRESTFilePath] + + errs, err := s.storage.DeleteFileBulk(volume, filePaths) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + s.writeErrorsResponse(w, errs) +} + // RenameFileHandler - rename a file. func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -447,6 +481,9 @@ func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) { Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount, storageRESTLeafFile)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)). Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodGetInstanceID).HandlerFunc(httpTraceAll(server.GetInstanceID)) diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index a2e39a7ed..b77bf9e36 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -1927,6 +1927,10 @@ func ExecObjectLayerAPITest(t *testing.T, objAPITest objAPITestType, endpoints [ globalIAMSys = NewIAMSys() globalIAMSys.Init(objLayer) + + globalPolicySys = NewPolicySys() + globalPolicySys.Init(objLayer) + // initialize the server and obtain the credentials and root. // credentials are necessary to sign the HTTP request. if err = newTestConfig(globalMinioDefaultRegion, objLayer); err != nil { diff --git a/cmd/utils.go b/cmd/utils.go index 003e9a351..65c56beb8 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -474,3 +474,10 @@ func restQueries(keys ...string) []string { } return accumulator } + +// Reverse the input order of a slice of string +func reverseStringSlice(input []string) { + for left, right := 0, len(input)-1; left < right; left, right = left+1, right-1 { + input[left], input[right] = input[right], input[left] + } +} diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index ed68203b1..b45cc13e0 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -466,9 +466,14 @@ func hashKey(algo string, key string, cardinality int) int { } } +// Returns always a same erasure coded set for a given input. +func (s *xlSets) getHashedSetIndex(input string) int { + return hashKey(s.distributionAlgo, input, len(s.sets)) +} + // Returns always a same erasure coded set for a given input. func (s *xlSets) getHashedSet(input string) (set *xlObjects) { - return s.sets[hashKey(s.distributionAlgo, input, len(s.sets))] + return s.sets[s.getHashedSetIndex(input)] } // GetBucketInfo - returns bucket info from one of the erasure coded set. @@ -618,6 +623,58 @@ func (s *xlSets) DeleteObject(ctx context.Context, bucket string, object string) return s.getHashedSet(object).DeleteObject(ctx, bucket, object) } +// DeleteObjects - bulk delete of objects +// Bulk delete is only possible within one set. For that purpose +// objects are group by set first, and then bulk delete is invoked +// for each set, the error response of each delete will be returned +func (s *xlSets) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + + type delObj struct { + // Set index associated to this object + setIndex int + // Original index from the list of arguments + // where this object is passed + origIndex int + // Object name + name string + } + + // Transform []delObj to the list of object names + toNames := func(delObjs []delObj) []string { + names := make([]string, len(delObjs)) + for i, obj := range delObjs { + names[i] = obj.name + } + return names + } + + // The result of delete operation on all passed objects + var delErrs = make([]error, len(objects)) + + // A map between a set and its associated objects + var objSetMap = make(map[int][]delObj) + + // Group objects by set index + for i, object := range objects { + index := s.getHashedSetIndex(object) + objSetMap[index] = append(objSetMap[index], delObj{setIndex: index, origIndex: i, name: object}) + } + + // Invoke bulk delete on objects per set and save + // the result of the delete operation + for _, objsGroup := range objSetMap { + errs, err := s.getHashedSet(objsGroup[0].name).DeleteObjects(ctx, bucket, toNames(objsGroup)) + if err != nil { + return nil, err + } + for i, obj := range objsGroup { + delErrs[obj.origIndex] = errs[i] + } + } + + return delErrs, nil +} + // CopyObject - copies objects from one hashedSet to another hashedSet, on server side. func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) { srcSet := s.getHashedSet(srcObject) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 1979c57d1..8f9b9bf83 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -824,6 +824,199 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, wri return reduceWriteQuorumErrs(ctx, dErrs, objectOpIgnoredErrs, writeQuorum) } +// deleteObject - wrapper for delete object, deletes an object from +// all the disks in parallel, including `xl.json` associated with the +// object. +func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects []string, errs []error, writeQuorums []int, isDirs []bool) ([]error, error) { + var tmpObjs = make([]string, len(objects)) + var disks = xl.getDisks() + + if bucket == minioMetaTmpBucket { + copy(tmpObjs, objects) + } else { + for i, object := range objects { + if errs[i] != nil { + continue + } + var err error + tmpObjs[i] = mustGetUUID() + // Rename the current object while requiring write quorum, but also consider + // that a non found object in a given disk as a success since it already + // confirms that the object doesn't have a part in that disk (already removed) + if isDirs[i] { + disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i], + []error{errFileNotFound, errFileAccessDenied}) + } else { + disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i], + []error{errFileNotFound}) + } + if err != nil { + errs[i] = err + } + } + } + + // Initialize sync waitgroup. + var wg = &sync.WaitGroup{} + // Initialize list of errors. + var opErrs = make([]error, len(disks)) + var delObjErrs = make([][]error, len(disks)) + + for index, disk := range disks { + if disk == nil { + opErrs[index] = errDiskNotFound + continue + } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + delObjErrs[index], opErrs[index] = cleanupObjectsBulk(ctx, disk, minioMetaTmpBucket, tmpObjs, errs) + if opErrs[index] == errVolumeNotFound { + opErrs[index] = nil + } + }(index, disk) + } + + // Wait for all routines to finish. + wg.Wait() + + // Return errors if any during deletion + if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(xl.getDisks())/2+1); err != nil { + return nil, err + } + + // Reduce errors for each object + for objIndex := range objects { + if errs[objIndex] != nil { + continue + } + listErrs := make([]error, len(xl.getDisks())) + for i := range delObjErrs { + if delObjErrs[i] != nil { + listErrs[i] = delObjErrs[i][objIndex] + } + } + errs[objIndex] = reduceWriteQuorumErrs(ctx, listErrs, objectOpIgnoredErrs, writeQuorums[objIndex]) + } + + return errs, nil +} + +func (xl xlObjects) deleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + writeQuorums := make([]int, len(objects)) + isObjectDirs := make([]bool, len(objects)) + + for i, object := range objects { + errs[i] = checkDelObjArgs(ctx, bucket, object) + } + + var objectLocks = make([]RWLocker, len(objects)) + + for i, object := range objects { + if errs[i] != nil { + continue + } + // Acquire a write lock before deleting the object. + objectLocks[i] = xl.nsMutex.NewNSLock(bucket, object) + if errs[i] = objectLocks[i].GetLock(globalOperationTimeout); errs[i] != nil { + continue + } + defer objectLocks[i].Unlock() + } + + for i, object := range objects { + isObjectDirs[i] = hasSuffix(object, slashSeparator) + } + + for i, object := range objects { + if isObjectDirs[i] { + _, err := xl.getObjectInfoDir(ctx, bucket, object) + if err == errXLReadQuorum { + if isObjectDirDangling(statAllDirs(ctx, xl.getDisks(), bucket, object)) { + // If object is indeed dangling, purge it. + errs[i] = nil + } + } + if err != nil { + errs[i] = toObjectErr(err, bucket, object) + continue + } + } + } + + for i, object := range objects { + if errs[i] != nil { + continue + } + if isObjectDirs[i] { + writeQuorums[i] = len(xl.getDisks())/2 + 1 + } else { + var err error + // Read metadata associated with the object from all disks. + partsMetadata, readXLErrs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object) + // get Quorum for this object + _, writeQuorums[i], err = objectQuorumFromMeta(ctx, xl, partsMetadata, readXLErrs) + if err != nil { + errs[i] = toObjectErr(err, bucket, object) + continue + } + } + } + + return xl.doDeleteObjects(ctx, bucket, objects, errs, writeQuorums, isObjectDirs) +} + +// DeleteObjects deletes objects in bulk, this function will still automatically split objects list +// into smaller bulks if some object names are found to be duplicated in the delete list, splitting +// into smaller bulks will avoid holding twice the write lock of the duplicated object names. +func (xl xlObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + + var ( + i, start, end int + // Deletion result for all objects + deleteErrs []error + // Object names store will be used to check for object name duplication + objectNamesStore = make(map[string]interface{}) + ) + + for { + if i >= len(objects) { + break + } + + object := objects[i] + + _, duplicationFound := objectNamesStore[object] + if duplicationFound { + end = i - 1 + } else { + objectNamesStore[object] = true + end = i + } + + if duplicationFound || i == len(objects)-1 { + errs, err := xl.deleteObjects(ctx, bucket, objects[start:end+1]) + if err != nil { + return nil, err + } + deleteErrs = append(deleteErrs, errs...) + objectNamesStore = make(map[string]interface{}) + } + + if duplicationFound { + // Avoid to increase the index if object + // name is found to be duplicated. + start = i + end = i + } else { + i++ + } + } + + return deleteErrs, nil +} + // DeleteObject - deletes an object, this call doesn't necessary reply // any error as it is not necessary for the handler to reply back a // response to the client request. diff --git a/cmd/xl-v1-object_test.go b/cmd/xl-v1-object_test.go index b6bba311e..23dc3c37e 100644 --- a/cmd/xl-v1-object_test.go +++ b/cmd/xl-v1-object_test.go @@ -111,6 +111,80 @@ func TestXLDeleteObjectBasic(t *testing.T) { // Cleanup backend directories removeRoots(fsDirs) } + +func TestXLDeleteObjectsXLSet(t *testing.T) { + + var objs []*xlObjects + for i := 0; i < 32; i++ { + obj, fsDirs, err := prepareXL(16) + if err != nil { + t.Fatal("Unable to initialize 'XL' object layer.", err) + } + // Remove all dirs. + for _, dir := range fsDirs { + defer os.RemoveAll(dir) + } + objs = append(objs, obj.(*xlObjects)) + } + + xlSets := &xlSets{sets: objs, distributionAlgo: "CRCMOD"} + + type testCaseType struct { + bucket string + object string + } + + bucketName := "bucket" + testCases := []testCaseType{ + {bucketName, "dir/obj1"}, + {bucketName, "dir/obj2"}, + {bucketName, "obj3"}, + {bucketName, "obj_4"}, + } + + err := xlSets.MakeBucketWithLocation(context.Background(), bucketName, "") + if err != nil { + t.Fatal(err) + } + + for _, testCase := range testCases { + _, err = xlSets.PutObject(context.Background(), testCase.bucket, testCase.object, + mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), ObjectOptions{}) + if err != nil { + t.Fatalf("XL Object upload failed: %s", err) + } + } + + toObjectNames := func(testCases []testCaseType) []string { + names := make([]string, len(testCases)) + for i := range testCases { + names[i] = testCases[i].object + } + return names + } + + objectNames := toObjectNames(testCases) + delErrs, err := xlSets.DeleteObjects(context.Background(), bucketName, objectNames) + if err != nil { + t.Errorf("Failed to call DeleteObjects with the error: `%v`", err) + } + + for i := range delErrs { + if delErrs[i] != nil { + t.Errorf("Failed to remove object `%v` with the error: `%v`", objectNames[i], delErrs[i]) + } + } + + for _, test := range testCases { + _, statErr := xlSets.GetObjectInfo(context.Background(), test.bucket, test.object, ObjectOptions{}) + switch statErr.(type) { + case ObjectNotFound: + default: + t.Fatalf("Object %s is not removed", test.bucket+"/"+test.object) + } + } +} + func TestXLDeleteObjectDiskNotFound(t *testing.T) { // Reset global storage class flags resetGlobalStorageEnvs()