Fix error handling in DeleteFileBulk storage handler (#8327)

errors.errorString() cannot be marshalled by gob
encoder, so using a slice of []error would fail
to be encoded. This leads to no errors being
generated instead gob.Decoder on the storage-client
would see an io.EOF

To avoid such bugs introduce a typed error for
handling such translations and register this type
for gob encoding support.
master
Harshavardhana 5 years ago committed by GitHub
parent 4ec9b349d0
commit f45977d371
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      cmd/server-main.go
  2. 10
      cmd/storage-rest-client.go
  3. 39
      cmd/storage-rest-server.go

@ -37,6 +37,7 @@ func init() {
logger.Init(GOPATH, GOROOT) logger.Init(GOPATH, GOROOT)
logger.RegisterUIError(fmtError) logger.RegisterUIError(fmtError)
gob.Register(HashMismatchError{}) gob.Register(HashMismatchError{})
gob.Register(DeleteFileError(""))
} }
// ServerFlags - server command specific flags // ServerFlags - server command specific flags

@ -375,7 +375,6 @@ func (client *storageRESTClient) DeleteFileBulk(volume string, paths []string) (
if len(paths) == 0 { if len(paths) == 0 {
return errs, err return errs, err
} }
errs = make([]error, len(paths))
values := make(url.Values) values := make(url.Values)
values.Set(storageRESTVolume, volume) values.Set(storageRESTVolume, volume)
for _, path := range paths { for _, path := range paths {
@ -388,14 +387,13 @@ func (client *storageRESTClient) DeleteFileBulk(volume string, paths []string) (
return nil, err return nil, err
} }
bulkErrs := bulkErrorsResponse{} dErrResp := &DeleteFileBulkErrsResp{}
gob.NewDecoder(respBody).Decode(&bulkErrs) if err = gob.NewDecoder(respBody).Decode(dErrResp); err != nil {
if err != nil {
return nil, err return nil, err
} }
for i, dErr := range bulkErrs.Errs { for _, dErr := range dErrResp.Errs {
errs[i] = toStorageErr(dErr) errs = append(errs, toStorageErr(dErr))
} }
return errs, nil return errs, nil

@ -49,22 +49,6 @@ func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error)
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
type bulkErrorsResponse struct {
Errs []error `json:"errors"`
}
func (s *storageRESTServer) writeErrorsResponse(w http.ResponseWriter, errs []error) {
resp := bulkErrorsResponse{Errs: make([]error, len(errs))}
for idx, err := range errs {
if err == nil {
continue
}
resp.Errs[idx] = err
}
gob.NewEncoder(w).Encode(resp)
w.(http.Flusher).Flush()
}
// DefaultSkewTime - skew time is 15 minutes between minio peers. // DefaultSkewTime - skew time is 15 minutes between minio peers.
const DefaultSkewTime = 15 * time.Minute const DefaultSkewTime = 15 * time.Minute
@ -455,6 +439,19 @@ func (s *storageRESTServer) DeleteFileHandler(w http.ResponseWriter, r *http.Req
} }
} }
// DeleteFileBulkErrsResp - collection of deleteFile errors
// for bulk deletes
type DeleteFileBulkErrsResp struct {
Errs []error
}
// DeleteFileError - error captured per delete operation
type DeleteFileError string
func (d DeleteFileError) Error() string {
return string(d)
}
// DeleteFileBulkHandler - delete a file. // DeleteFileBulkHandler - delete a file.
func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http.Request) { func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) { if !s.IsValid(w, r) {
@ -470,7 +467,15 @@ func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http
return return
} }
s.writeErrorsResponse(w, errs) derrsResp := &DeleteFileBulkErrsResp{Errs: make([]error, len(errs))}
for idx, err := range errs {
if err != nil {
derrsResp.Errs[idx] = DeleteFileError(err.Error())
}
}
gob.NewEncoder(w).Encode(derrsResp)
w.(http.Flusher).Flush()
} }
// RenameFileHandler - rename a file. // RenameFileHandler - rename a file.

Loading…
Cancel
Save