From 7ce63b3078ea8bc82ea332054267888724d88fce Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 2 Feb 2020 07:41:29 +0530 Subject: [PATCH] fix: multi-delete API write quorum failures (#8926) multi-delete API failed with write quorum errors under following situations - list of files requested for delete doesn't exist anymore can lead to quorum errors and failure - due to usage of query param for paths, for really long paths MinIO server rejects these requests as malformed as unexpected. This was reproduced with warp --- cmd/storage-rest-client.go | 10 +++++++--- cmd/storage-rest-common.go | 2 +- cmd/storage-rest-server.go | 15 +++++++++++++-- cmd/xl-v1-object.go | 13 ++++++------- 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 1ce3798f4..f2aff4c32 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -245,7 +245,7 @@ func (client *storageRESTClient) AppendFile(volume, path string, buffer []byte) values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) - reader := bytes.NewBuffer(buffer) + reader := bytes.NewReader(buffer) respBody, err := client.call(storageRESTMethodAppendFile, values, reader, -1) defer http.DrainBody(respBody) return err @@ -405,10 +405,14 @@ func (client *storageRESTClient) DeleteFileBulk(volume string, paths []string) ( } values := make(url.Values) values.Set(storageRESTVolume, volume) + + var buffer bytes.Buffer for _, path := range paths { - values.Add(storageRESTFilePath, path) + buffer.WriteString(path) + buffer.WriteString("\n") } - respBody, err := client.call(storageRESTMethodDeleteFileBulk, values, nil, -1) + + respBody, err := client.call(storageRESTMethodDeleteFileBulk, values, &buffer, -1) defer http.DrainBody(respBody) if err != nil { diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index e5a5920f8..597e09101 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v13" // Introduced StorageErr error type. + storageRESTVersion = "v14" // DeleteFileBulk API change storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 0abe16b6f..d8dbd38a0 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -17,6 +17,7 @@ package cmd import ( + "bufio" "context" "encoding/gob" "encoding/hex" @@ -502,7 +503,17 @@ func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http } vars := r.URL.Query() volume := vars.Get(storageRESTVolume) - filePaths := vars[storageRESTFilePath] + + bio := bufio.NewScanner(r.Body) + var filePaths []string + for bio.Scan() { + filePaths = append(filePaths, bio.Text()) + } + + if err := bio.Err(); err != nil { + s.writeErrorResponse(w, err) + return + } errs, err := s.storage.DeleteFileBulk(volume, filePaths) if err != nil { @@ -664,7 +675,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)). - Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + Queries(restQueries(storageRESTVolume)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)). Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index b53c44881..7affa7745 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -738,8 +738,7 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, wri // object. func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects []string, errs []error, writeQuorums []int, isDirs []bool) ([]error, error) { var tmpObjs = make([]string, len(objects)) - var disks = xl.getDisks() - + disks := xl.getDisks() if bucket == minioMetaTmpBucket { copy(tmpObjs, objects) } else { @@ -753,10 +752,10 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects // that a non found object in a given disk as a success since it already // confirms that the object doesn't have a part in that disk (already removed) if isDirs[i] { - disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i], + disks, err = rename(ctx, disks, bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i], []error{errFileNotFound, errFileAccessDenied}) } else { - disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i], + disks, err = rename(ctx, disks, bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i], []error{errFileNotFound}) } if err != nil { @@ -776,13 +775,13 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects continue } delObjErrs[index], opErrs[index] = cleanupObjectsBulk(disk, minioMetaTmpBucket, tmpObjs, errs) - if opErrs[index] == errVolumeNotFound { + if opErrs[index] == errVolumeNotFound || opErrs[index] == errFileNotFound { opErrs[index] = nil } } // Return errors if any during deletion - if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(xl.getDisks())/2+1); err != nil { + if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(disks)/2+1); err != nil { return nil, err } @@ -791,7 +790,7 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects if errs[objIndex] != nil { continue } - listErrs := make([]error, len(xl.getDisks())) + listErrs := make([]error, len(disks)) for i := range delObjErrs { if delObjErrs[i] != nil { listErrs[i] = delObjErrs[i][objIndex]