diff --git a/cmd/fs-v1-helpers.go b/cmd/fs-v1-helpers.go index f8a187a87..a275c08b6 100644 --- a/cmd/fs-v1-helpers.go +++ b/cmd/fs-v1-helpers.go @@ -429,7 +429,7 @@ func fsDeleteFile(ctx context.Context, basePath, deletePath string) error { return err } - if err := deleteFile(basePath, deletePath); err != nil { + if err := deleteFile(basePath, deletePath, false); err != nil { if err != errFileNotFound { logger.LogIf(ctx, err) } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 3dda0b9a2..0c89b8d59 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -196,6 +196,13 @@ func (d *naughtyDisk) DeleteFileBulk(volume string, paths []string) ([]error, er return errs, nil } +func (d *naughtyDisk) DeletePrefixes(volume string, paths []string) ([]error, error) { + if err := d.calcError(); err != nil { + return nil, err + } + return d.disk.DeletePrefixes(volume, paths) +} + func (d *naughtyDisk) WriteAll(volume string, path string, reader io.Reader) (err error) { if err := d.calcError(); err != nil { return err diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index 1830482d4..ba2c3b18f 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -147,78 +147,6 @@ func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string) return err } -// Cleanup objects in bulk and recursively: each object will have a list of sub-files to delete in the backend -func cleanupObjectsBulk(storage StorageAPI, volume string, objsPaths []string, errs []error) ([]error, error) { - // The list of files in disk to delete - var filesToDelete []string - // Map files to delete to the passed objsPaths - var filesToDeleteObjsIndexes []int - - // Traverse and return the list of sub entries - var traverse func(string) ([]string, error) - traverse = func(entryPath string) ([]string, error) { - var output = make([]string, 0) - if !HasSuffix(entryPath, SlashSeparator) { - output = append(output, entryPath) - return output, nil - } - entries, err := storage.ListDir(volume, entryPath, -1, "") - if err != nil { - if err == errFileNotFound { - return nil, nil - } - return nil, err - } - - for _, entry := range entries { - subEntries, err := traverse(pathJoin(entryPath, entry)) - if err != nil { - return nil, err - } - output = append(output, subEntries...) - } - return output, nil - } - - // Find and collect the list of files to remove associated - // to the passed objects paths - for idx, objPath := range objsPaths { - if errs[idx] != nil { - continue - } - output, err := traverse(retainSlash(pathJoin(objPath))) - if err != nil { - errs[idx] = err - continue - } else { - errs[idx] = nil - } - filesToDelete = append(filesToDelete, output...) - for i := 0; i < len(output); i++ { - filesToDeleteObjsIndexes = append(filesToDeleteObjsIndexes, idx) - } - } - - // Reverse the list so remove can succeed - reverseStringSlice(filesToDelete) - - dErrs, err := storage.DeleteFileBulk(volume, filesToDelete) - if err != nil { - return nil, err - } - - // Map files deletion errors to the correspondent objects - for i := range dErrs { - if dErrs[i] != nil { - if errs[filesToDeleteObjsIndexes[i]] != nil { - errs[filesToDeleteObjsIndexes[i]] = dErrs[i] - } - } - } - - return errs, nil -} - // Removes notification.xml for a given bucket, only used during DeleteBucket. func removeNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucket string) error { // Verify bucket is valid. diff --git a/cmd/posix-diskid-check.go b/cmd/posix-diskid-check.go index 240a1c156..2b265f220 100644 --- a/cmd/posix-diskid-check.go +++ b/cmd/posix-diskid-check.go @@ -179,6 +179,13 @@ func (p *posixDiskIDCheck) DeleteFileBulk(volume string, paths []string) (errs [ return p.storage.DeleteFileBulk(volume, paths) } +func (p *posixDiskIDCheck) DeletePrefixes(volume string, paths []string) (errs []error, err error) { + if p.isDiskStale() { + return nil, errDiskNotFound + } + return p.storage.DeletePrefixes(volume, paths) +} + func (p *posixDiskIDCheck) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error { if p.isDiskStale() { return errDiskNotFound diff --git a/cmd/posix.go b/cmd/posix.go index 4cbdc429f..049961cb3 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -1265,16 +1265,28 @@ func (s *posix) StatFile(volume, path string) (file FileInfo, err error) { }, nil } -// deleteFile deletes a file path if its empty. If it's successfully deleted, -// it will recursively move up the tree, deleting empty parent directories -// until it finds one with files in it. Returns nil for a non-empty directory. -func deleteFile(basePath, deletePath string) error { - if basePath == deletePath { +// deleteFile deletes a file or a directory if its empty unless recursive +// is set to true. If the target is successfully deleted, it will recursively +// move up the tree, deleting empty parent directories until it finds one +// with files in it. Returns nil for a non-empty directory even when +// recursive is set to false. +func deleteFile(basePath, deletePath string, recursive bool) error { + if basePath == "" || deletePath == "" { + return nil + } + basePath = filepath.Clean(basePath) + deletePath = filepath.Clean(deletePath) + if !strings.HasPrefix(deletePath, basePath) || deletePath == basePath { return nil } - // Attempt to remove path. - if err := os.Remove((deletePath)); err != nil { + var err error + if recursive { + err = os.RemoveAll(deletePath) + } else { + err = os.Remove(deletePath) + } + if err != nil { switch { case isSysErrNotEmpty(err): // Ignore errors if the directory is not empty. The server relies on @@ -1297,12 +1309,58 @@ func deleteFile(basePath, deletePath string) error { deletePath = strings.TrimSuffix(deletePath, SlashSeparator) deletePath = slashpath.Dir(deletePath) - // Delete parent directory. Errors for parent directories shouldn't trickle down. - deleteFile(basePath, deletePath) + // Delete parent directory obviously not recursively. Errors for + // parent directories shouldn't trickle down. + deleteFile(basePath, deletePath, false) return nil } +// DeletePrefixes forcibly deletes all the contents of a set of specified paths. +// Parent directories are automatically removed if they become empty. err can +// bil nil while errs can contain some errors for corresponding objects. No error +// is set if a specified prefix path does not exist. +func (s *posix) DeletePrefixes(volume string, paths []string) (errs []error, err error) { + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + + volumeDir, err := s.getVolDir(volume) + if err != nil { + return nil, err + } + + // Stat a volume entry. + _, err = os.Stat(volumeDir) + if err != nil { + if os.IsNotExist(err) { + return nil, errVolumeNotFound + } else if os.IsPermission(err) { + return nil, errVolumeAccessDenied + } else if isSysErrIO(err) { + return nil, errFaultyDisk + } + return nil, err + } + + errs = make([]error, len(paths)) + // Following code is needed so that we retain SlashSeparator + // suffix if any in path argument. + for idx, path := range paths { + filePath := pathJoin(volumeDir, path) + errs[idx] = checkPathLength(filePath) + if errs[idx] != nil { + continue + } + // Delete file or a directory recursively, delete parent + // directory as well if its empty. + errs[idx] = deleteFile(volumeDir, filePath, true) + } + + return +} + // DeleteFile - delete a file at path. func (s *posix) DeleteFile(volume, path string) (err error) { atomic.AddInt32(&s.activeIOCount, 1) @@ -1316,7 +1374,7 @@ func (s *posix) DeleteFile(volume, path string) (err error) { } // Stat a volume entry. - _, err = os.Stat((volumeDir)) + _, err = os.Stat(volumeDir) if err != nil { if os.IsNotExist(err) { return errVolumeNotFound @@ -1331,12 +1389,12 @@ func (s *posix) DeleteFile(volume, path string) (err error) { // Following code is needed so that we retain SlashSeparator suffix if any in // path argument. filePath := pathJoin(volumeDir, path) - if err = checkPathLength((filePath)); err != nil { + if err = checkPathLength(filePath); err != nil { return err } // Delete file and delete parent directory as well if its empty. - return deleteFile(volumeDir, filePath) + return deleteFile(volumeDir, filePath, false) } func (s *posix) DeleteFileBulk(volume string, paths []string) (errs []error, err error) { @@ -1373,7 +1431,7 @@ func (s *posix) DeleteFileBulk(volume string, paths []string) (errs []error, err continue } // Delete file and delete parent directory as well if its empty. - errs[idx] = deleteFile(volumeDir, filePath) + errs[idx] = deleteFile(volumeDir, filePath, false) } return } @@ -1460,7 +1518,7 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e // Remove parent dir of the source file if empty if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) { - deleteFile(srcVolumeDir, parentDir) + deleteFile(srcVolumeDir, parentDir, false) } return nil diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 394c3915c..bcd04e539 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -55,6 +55,7 @@ type StorageAPI interface { StatFile(volume string, path string) (file FileInfo, err error) DeleteFile(volume string, path string) (err error) DeleteFileBulk(volume string, paths []string) (errs []error, err error) + DeletePrefixes(volume string, paths []string) (errs []error, err error) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error // Write all data, syncs the data to disk. diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 1e95bfe34..9f5fd138b 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -414,13 +414,54 @@ func (client *storageRESTClient) DeleteFileBulk(volume string, paths []string) ( respBody, err := client.call(storageRESTMethodDeleteFileBulk, values, &buffer, -1) defer http.DrainBody(respBody) + if err != nil { + return nil, err + } + reader, err := clearLeadingSpaces(respBody) if err != nil { return nil, err } dErrResp := &DeleteFileBulkErrsResp{} - if err = gob.NewDecoder(respBody).Decode(dErrResp); err != nil { + if err = gob.NewDecoder(reader).Decode(dErrResp); err != nil { + return nil, err + } + + for _, dErr := range dErrResp.Errs { + errs = append(errs, toStorageErr(dErr)) + } + + return errs, nil +} + +// DeletePrefixes - deletes prefixes in bulk. +func (client *storageRESTClient) DeletePrefixes(volume string, paths []string) (errs []error, err error) { + if len(paths) == 0 { + return errs, err + } + values := make(url.Values) + values.Set(storageRESTVolume, volume) + + var buffer bytes.Buffer + for _, path := range paths { + buffer.WriteString(path) + buffer.WriteString("\n") + } + + respBody, err := client.call(storageRESTMethodDeletePrefixes, values, &buffer, -1) + defer http.DrainBody(respBody) + if err != nil { + return nil, err + } + + reader, err := clearLeadingSpaces(respBody) + if err != nil { + return nil, err + } + + dErrResp := &DeletePrefixesErrsResp{} + if err = gob.NewDecoder(reader).Decode(dErrResp); err != nil { return nil, err } @@ -443,6 +484,22 @@ func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPa return err } +// clearLeadingSpaces removes all the first spaces returned from a reader. +func clearLeadingSpaces(r io.Reader) (io.Reader, error) { + reader := bufio.NewReader(r) + for { + b, err := reader.ReadByte() + if err != nil { + return nil, err + } + if b != ' ' { + reader.UnreadByte() + break + } + } + return reader, nil +} + func (client *storageRESTClient) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error { values := make(url.Values) values.Set(storageRESTVolume, volume) @@ -457,16 +514,9 @@ func (client *storageRESTClient) VerifyFile(volume, path string, size int64, alg if err != nil { return err } - reader := bufio.NewReader(respBody) - for { - b, err := reader.ReadByte() - if err != nil { - return err - } - if b != ' ' { - reader.UnreadByte() - break - } + reader, err := clearLeadingSpaces(respBody) + if err != nil { + return err } verifyResp := &VerifyFileResp{} if err = gob.NewDecoder(reader).Decode(verifyResp); err != nil { diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 597e09101..ace9dc456 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v14" // DeleteFileBulk API change + storageRESTVersion = "v15" // Adding DeletePrefixes API storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -42,6 +42,7 @@ const ( storageRESTMethodWalk = "/walk" storageRESTMethodDeleteFile = "/deletefile" storageRESTMethodDeleteFileBulk = "/deletefilebulk" + storageRESTMethodDeletePrefixes = "/deleteprefixes" storageRESTMethodRenameFile = "/renamefile" storageRESTMethodVerifyFile = "/verifyfile" ) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index c798d5b8e..790acf1cd 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -508,20 +508,69 @@ func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http return } + w.Header().Set(xhttp.ContentType, "text/event-stream") + encoder := gob.NewEncoder(w) + doneCh := sendWhiteSpaceToHTTPResponse(w) errs, err := s.storage.DeleteFileBulk(volume, filePaths) + <-doneCh if err != nil { s.writeErrorResponse(w, err) return } - derrsResp := &DeleteFileBulkErrsResp{Errs: make([]error, len(errs))} + dErrsResp := &DeleteFileBulkErrsResp{Errs: make([]error, len(errs))} for idx, err := range errs { if err != nil { - derrsResp.Errs[idx] = StorageErr(err.Error()) + dErrsResp.Errs[idx] = StorageErr(err.Error()) } } - gob.NewEncoder(w).Encode(derrsResp) + encoder.Encode(dErrsResp) + w.(http.Flusher).Flush() +} + +// DeletePrefixesErrsResp - collection of delete errors +// for bulk prefixes deletes +type DeletePrefixesErrsResp struct { + Errs []error +} + +// DeletePrefixesHandler - delete a set of a prefixes. +func (s *storageRESTServer) DeletePrefixesHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := r.URL.Query() + volume := vars.Get(storageRESTVolume) + + bio := bufio.NewScanner(r.Body) + var prefixes []string + for bio.Scan() { + prefixes = append(prefixes, bio.Text()) + } + + if err := bio.Err(); err != nil { + s.writeErrorResponse(w, err) + return + } + + w.Header().Set(xhttp.ContentType, "text/event-stream") + encoder := gob.NewEncoder(w) + doneCh := sendWhiteSpaceToHTTPResponse(w) + errs, err := s.storage.DeletePrefixes(volume, prefixes) + <-doneCh + if err != nil { + s.writeErrorResponse(w, err) + return + } + + dErrsResp := &DeletePrefixesErrsResp{Errs: make([]error, len(errs))} + for idx, err := range errs { + if err != nil { + dErrsResp.Errs[idx] = StorageErr(err.Error()) + } + } + encoder.Encode(dErrsResp) w.(http.Flusher).Flush() } @@ -665,6 +714,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount, storageRESTLeafFile)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalk).HandlerFunc(httpTraceHdrs(server.WalkHandler)). Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive, storageRESTLeafFile)...) + subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeletePrefixes).HandlerFunc(httpTraceHdrs(server.DeletePrefixesHandler)). + Queries(restQueries(storageRESTVolume)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)). diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index aea9ac5ee..5a35da219 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -23,6 +23,7 @@ import ( "io" "net/http" "path" + "sync" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -781,19 +782,26 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects // Initialize list of errors. var opErrs = make([]error, len(disks)) var delObjErrs = make([][]error, len(disks)) + var wg = sync.WaitGroup{} // Remove objects in bulk for each disk - for index, disk := range disks { - if disk == nil { - opErrs[index] = errDiskNotFound + for i, d := range disks { + if d == nil { + opErrs[i] = errDiskNotFound continue } - delObjErrs[index], opErrs[index] = cleanupObjectsBulk(disk, minioMetaTmpBucket, tmpObjs, errs) - if opErrs[index] == errVolumeNotFound || opErrs[index] == errFileNotFound { - opErrs[index] = nil - } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + delObjErrs[index], opErrs[index] = disk.DeletePrefixes(minioMetaTmpBucket, tmpObjs) + if opErrs[index] == errVolumeNotFound || opErrs[index] == errFileNotFound { + opErrs[index] = nil + } + }(i, d) } + wg.Wait() + // Return errors if any during deletion if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(disks)/2+1); err != nil { return nil, err @@ -805,9 +813,14 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects continue } listErrs := make([]error, len(disks)) + // Iterate over disks to fetch the error + // of deleting of the current object for i := range delObjErrs { + // delObjErrs[i] is not nil when disks[i] is also not nil if delObjErrs[i] != nil { - listErrs[i] = delObjErrs[i][objIndex] + if delObjErrs[i][objIndex] != errFileNotFound { + listErrs[i] = delObjErrs[i][objIndex] + } } } errs[objIndex] = reduceWriteQuorumErrs(ctx, listErrs, objectOpIgnoredErrs, writeQuorums[objIndex])