From 91588209fa69386dcbfee154363bb9a069ca4e60 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 20 Apr 2016 17:23:23 -0700 Subject: [PATCH] obj: Object api handle all errors in common location. (#1343) --- api-errors.go | 14 +++++++ object-api-multipart.go | 75 +++++++---------------------------- object-api.go | 86 +++++++++-------------------------------- object-errors.go | 58 ++++++++++++++++++++++++++- storage-errors.go | 5 ++- web-handlers.go | 4 ++ xl-v1-createfile.go | 4 +- 7 files changed, 113 insertions(+), 133 deletions(-) diff --git a/api-errors.go b/api-errors.go index 6dc73bf23..0b6fd2d13 100644 --- a/api-errors.go +++ b/api-errors.go @@ -105,6 +105,10 @@ const ( ErrInvalidQuerySignatureAlgo ErrInvalidQueryParams // Add new error codes here. + + // Extended errors. + ErrInsufficientReadResources + ErrInsufficientWriteResources ) // error code to APIError structure, these fields carry respective @@ -185,6 +189,16 @@ var errorCodeResponse = map[APIErrorCode]APIError{ Description: "We encountered an internal error, please try again.", HTTPStatusCode: http.StatusInternalServerError, }, + ErrInsufficientReadResources: { + Code: "InternalError", + Description: "We cannot satisfy sufficient read resources requested at this moment, please try again.", + HTTPStatusCode: http.StatusInternalServerError, + }, + ErrInsufficientWriteResources: { + Code: "InternalError", + Description: "We cannot satisfy sufficient write resources requested at this moment, please try again.", + HTTPStatusCode: http.StatusInternalServerError, + }, ErrInvalidAccessKeyID: { Code: "InvalidAccessKeyID", Description: "The access key ID you provided does not exist in our records.", diff --git a/object-api-multipart.go b/object-api-multipart.go index 3c56693d5..867a8ee71 100644 --- a/object-api-multipart.go +++ b/object-api-multipart.go @@ -87,14 +87,6 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke if !IsValidObjectPrefix(prefix) { return ListMultipartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: prefix}) } - if _, e := o.storage.StatVol(minioMetaVolume); e != nil { - if e == errVolumeNotFound { - e = o.storage.MakeVol(minioMetaVolume) - if e != nil { - return ListMultipartsInfo{}, probe.NewError(e) - } - } - } // Verify if delimiter is anything other than '/', which we do not support. if delimiter != "" && delimiter != slashSeparator { return ListMultipartsInfo{}, probe.NewError(UnsupportedDelimiter{ @@ -292,22 +284,21 @@ func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Err uploadIDPath := path.Join(bucket, object, uploadID) if _, e = o.storage.StatFile(minioMetaVolume, uploadIDPath); e != nil { if e != errFileNotFound { - return "", probe.NewError(e) + return "", probe.NewError(toObjectErr(e, minioMetaVolume, uploadIDPath)) } // uploadIDPath doesn't exist, so create empty file to reserve the name var w io.WriteCloser if w, e = o.storage.CreateFile(minioMetaVolume, uploadIDPath); e == nil { // Just write some data for erasure code, rather than zero bytes. - w.Write([]byte(uploadID)) + if _, e = w.Write([]byte(uploadID)); e != nil { + return "", probe.NewError(toObjectErr(e, minioMetaVolume, uploadIDPath)) + } // Close the writer. if e = w.Close(); e != nil { return "", probe.NewError(e) } } else { - if e == errDiskFull { - return "", probe.NewError(StorageFull{}) - } - return "", probe.NewError(e) + return "", probe.NewError(toObjectErr(e, minioMetaVolume, uploadIDPath)) } return uploadID, nil } @@ -358,19 +349,7 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, partID, md5Hex) fileWriter, e := o.storage.CreateFile(minioMetaVolume, path.Join(bucket, object, partSuffix)) if e != nil { - if e == errVolumeNotFound { - return "", probe.NewError(BucketNotFound{ - Bucket: bucket, - }) - } else if e == errIsNotRegular { - return "", probe.NewError(ObjectExistsAsPrefix{ - Bucket: bucket, - Object: object, - }) - } else if e == errDiskFull { - return "", probe.NewError(StorageFull{}) - } - return "", probe.NewError(e) + return "", probe.NewError(toObjectErr(e, bucket, object)) } // Initialize md5 writer. @@ -383,10 +362,7 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si if size > 0 { if _, e = io.CopyN(multiWriter, data, size); e != nil { safeCloseAndRemove(fileWriter) - if e == io.ErrUnexpectedEOF || e == io.ErrShortWrite { - return "", probe.NewError(IncompleteBody{}) - } - return "", probe.NewError(e) + return "", probe.NewError(toObjectErr(e)) } // Reader shouldn't have more data what mentioned in size argument. // reading one more byte from the reader to validate it. @@ -398,7 +374,7 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si } else { if _, e = io.Copy(multiWriter, data); e != nil { safeCloseAndRemove(fileWriter) - return "", probe.NewError(e) + return "", probe.NewError(toObjectErr(e)) } } @@ -424,15 +400,6 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa if !IsValidObjectName(object) { return ListPartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // Create minio meta volume, if it doesn't exist yet. - if _, e := o.storage.StatVol(minioMetaVolume); e != nil { - if e == errVolumeNotFound { - e = o.storage.MakeVol(minioMetaVolume) - if e != nil { - return ListPartsInfo{}, probe.NewError(e) - } - } - } if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil { return ListPartsInfo{}, probe.NewError(e) } else if !status { @@ -445,10 +412,10 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa // Figure out the marker for the next subsequent calls, if the // partNumberMarker is already set. if partNumberMarker > 0 { - uploadIDPartPrefix := uploadIDPath + "." + strconv.Itoa(partNumberMarker) + "." - fileInfos, _, e := o.storage.ListFiles(minioMetaVolume, uploadIDPartPrefix, "", false, 1) + partNumberMarkerPath := uploadIDPath + "." + strconv.Itoa(partNumberMarker) + "." + fileInfos, _, e := o.storage.ListFiles(minioMetaVolume, partNumberMarkerPath, "", false, 1) if e != nil { - return result, probe.NewError(e) + return result, probe.NewError(toObjectErr(e, minioMetaVolume, partNumberMarkerPath)) } if len(fileInfos) == 0 { return result, probe.NewError(InvalidPart{}) @@ -521,19 +488,7 @@ func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadI fileWriter, e := o.storage.CreateFile(bucket, object) if e != nil { - if e == errVolumeNotFound { - return "", probe.NewError(BucketNotFound{ - Bucket: bucket, - }) - } else if e == errIsNotRegular { - return "", probe.NewError(ObjectExistsAsPrefix{ - Bucket: bucket, - Object: object, - }) - } else if e == errDiskFull { - return "", probe.NewError(StorageFull{}) - } - return "", probe.NewError(e) + return "", probe.NewError(toObjectErr(e, bucket, object)) } var md5Sums []string @@ -585,16 +540,14 @@ func (o objectAPI) removeMultipartUpload(bucket, object, uploadID string) *probe if !IsValidObjectName(object) { return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - if _, e := o.storage.StatVol(minioMetaVolume); e != nil { - return nil - } marker := "" for { uploadIDPath := path.Join(bucket, object, uploadID) fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDPath, marker, false, 1000) if e != nil { - return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object}) + + return probe.NewError(InvalidUploadID{UploadID: uploadID}) } for _, fileInfo := range fileInfos { o.storage.DeleteFile(minioMetaVolume, fileInfo.Name) diff --git a/object-api.go b/object-api.go index 4bc3e1be2..58914ef5d 100644 --- a/object-api.go +++ b/object-api.go @@ -46,12 +46,16 @@ func (o objectAPI) MakeBucket(bucket string) *probe.Error { return probe.NewError(BucketNameInvalid{Bucket: bucket}) } if e := o.storage.MakeVol(bucket); e != nil { - if e == errVolumeExists { - return probe.NewError(BucketExists{Bucket: bucket}) - } else if e == errDiskFull { - return probe.NewError(StorageFull{}) + return probe.NewError(toObjectErr(e, bucket)) + } + // This happens for the first time, but keep this here since this + // is the only place where it can be made expensive optimizing all + // other calls. + // Create minio meta volume, if it doesn't exist yet. + if e := o.storage.MakeVol(minioMetaVolume); e != nil { + if e != errVolumeExists { + return probe.NewError(toObjectErr(e, minioMetaVolume)) } - return probe.NewError(e) } return nil } @@ -64,10 +68,7 @@ func (o objectAPI) GetBucketInfo(bucket string) (BucketInfo, *probe.Error) { } vi, e := o.storage.StatVol(bucket) if e != nil { - if e == errVolumeNotFound { - return BucketInfo{}, probe.NewError(BucketNotFound{Bucket: bucket}) - } - return BucketInfo{}, probe.NewError(e) + return BucketInfo{}, probe.NewError(toObjectErr(e, bucket)) } return BucketInfo{ Name: bucket, @@ -82,7 +83,7 @@ func (o objectAPI) ListBuckets() ([]BucketInfo, *probe.Error) { var bucketInfos []BucketInfo vols, e := o.storage.ListVols() if e != nil { - return nil, probe.NewError(e) + return nil, probe.NewError(toObjectErr(e)) } for _, vol := range vols { // StorageAPI can send volume names which are incompatible @@ -107,12 +108,7 @@ func (o objectAPI) DeleteBucket(bucket string) *probe.Error { return probe.NewError(BucketNameInvalid{Bucket: bucket}) } if e := o.storage.DeleteVol(bucket); e != nil { - if e == errVolumeNotFound { - return probe.NewError(BucketNotFound{Bucket: bucket}) - } else if e == errVolumeNotEmpty { - return probe.NewError(BucketNotEmpty{Bucket: bucket}) - } - return probe.NewError(e) + return probe.NewError(toObjectErr(e)) } return nil } @@ -131,17 +127,7 @@ func (o objectAPI) GetObject(bucket, object string, startOffset int64) (io.ReadC } r, e := o.storage.ReadFile(bucket, object, startOffset) if e != nil { - if e == errVolumeNotFound { - return nil, probe.NewError(BucketNotFound{Bucket: bucket}) - } else if e == errFileNotFound { - return nil, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object}) - } else if e == errIsNotRegular { - return nil, probe.NewError(ObjectExistsAsPrefix{ - Bucket: bucket, - Object: object, - }) - } - return nil, probe.NewError(e) + return nil, probe.NewError(toObjectErr(e, bucket, object)) } return r, nil } @@ -158,14 +144,7 @@ func (o objectAPI) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Erro } fi, e := o.storage.StatFile(bucket, object) if e != nil { - if e == errVolumeNotFound { - return ObjectInfo{}, probe.NewError(BucketNotFound{Bucket: bucket}) - } else if e == errFileNotFound || e == errIsNotRegular { - return ObjectInfo{}, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object}) - // Handle more lower level errors if needed. - } else { - return ObjectInfo{}, probe.NewError(e) - } + return ObjectInfo{}, probe.NewError(toObjectErr(e, bucket, object)) } contentType := "application/octet-stream" if objectExt := filepath.Ext(object); objectExt != "" { @@ -213,19 +192,7 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R } fileWriter, e := o.storage.CreateFile(bucket, object) if e != nil { - if e == errVolumeNotFound { - return "", probe.NewError(BucketNotFound{ - Bucket: bucket, - }) - } else if e == errIsNotRegular { - return "", probe.NewError(ObjectExistsAsPrefix{ - Bucket: bucket, - Object: object, - }) - } else if e == errDiskFull { - return "", probe.NewError(StorageFull{}) - } - return "", probe.NewError(e) + return "", probe.NewError(toObjectErr(e, bucket, object)) } // Initialize md5 writer. @@ -240,10 +207,7 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R if clErr := safeCloseAndRemove(fileWriter); clErr != nil { return "", probe.NewError(clErr) } - if e == io.ErrUnexpectedEOF { - return "", probe.NewError(IncompleteBody{}) - } - return "", probe.NewError(e) + return "", probe.NewError(toObjectErr(e)) } } else { if _, e = io.Copy(multiWriter, data); e != nil { @@ -286,18 +250,7 @@ func (o objectAPI) DeleteObject(bucket, object string) *probe.Error { return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } if e := o.storage.DeleteFile(bucket, object); e != nil { - if e == errVolumeNotFound { - return probe.NewError(BucketNotFound{Bucket: bucket}) - } else if e == errFileNotFound { - return probe.NewError(ObjectNotFound{Bucket: bucket}) - } - if e == errFileNotFound { - return probe.NewError(ObjectNotFound{ - Bucket: bucket, - Object: object, - }) - } - return probe.NewError(e) + return probe.NewError(toObjectErr(e, bucket, object)) } return nil } @@ -331,10 +284,7 @@ func (o objectAPI) ListObjects(bucket, prefix, marker, delimiter string, maxKeys } fileInfos, eof, e := o.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys) if e != nil { - if e == errVolumeNotFound { - return ListObjectsInfo{}, probe.NewError(BucketNotFound{Bucket: bucket}) - } - return ListObjectsInfo{}, probe.NewError(e) + return ListObjectsInfo{}, probe.NewError(toObjectErr(e, bucket)) } if maxKeys == 0 { return ListObjectsInfo{}, nil diff --git a/object-errors.go b/object-errors.go index 1964806df..7b0a0f074 100644 --- a/object-errors.go +++ b/object-errors.go @@ -16,7 +16,49 @@ package main -import "fmt" +import ( + "fmt" + "io" +) + +// Converts underlying storage error. Convenience function written to +// handle all cases where we have known types of errors returned by +// underlying storage layer. +func toObjectErr(err error, params ...string) error { + switch err { + case errVolumeNotFound: + if len(params) >= 1 { + return BucketNotFound{Bucket: params[0]} + } + case errVolumeExists: + if len(params) >= 1 { + return BucketExists{Bucket: params[0]} + } + case errDiskFull: + return StorageFull{} + case errReadQuorum: + return StorageInsufficientReadResources{} + case errWriteQuorum: + return StorageInsufficientWriteResources{} + case errFileNotFound: + if len(params) >= 2 { + return ObjectNotFound{ + Bucket: params[0], + Object: params[1], + } + } + case errIsNotRegular: + if len(params) >= 2 { + return ObjectExistsAsPrefix{ + Bucket: params[0], + Object: params[1], + } + } + case io.ErrUnexpectedEOF, io.ErrShortWrite: + return IncompleteBody{} + } + return err +} // StorageFull storage ran out of space type StorageFull struct{} @@ -25,6 +67,20 @@ func (e StorageFull) Error() string { return "Storage reached its minimum free disk threshold." } +// StorageInsufficientReadResources storage cannot satisfy quorum for read operation. +type StorageInsufficientReadResources struct{} + +func (e StorageInsufficientReadResources) Error() string { + return "Storage resources are insufficient for the read operation." +} + +// StorageInsufficientWriteResources storage cannot satisfy quorum for write operation. +type StorageInsufficientWriteResources struct{} + +func (e StorageInsufficientWriteResources) Error() string { + return "Stroage resources are insufficient for the write operation." +} + // GenericError - generic object layer error. type GenericError struct { Bucket string diff --git a/storage-errors.go b/storage-errors.go index ce1141b62..068b2ddf5 100644 --- a/storage-errors.go +++ b/storage-errors.go @@ -44,4 +44,7 @@ var errVolumeAccessDenied = errors.New("volume access denied") var errFileAccessDenied = errors.New("file access denied") // errReadQuorum - did not meet read quorum. -var errReadQuorum = errors.New("I/O error. do not meet read quorum") +var errReadQuorum = errors.New("I/O error. did not meet read quorum.") + +// errWriteQuorum - did not meet write quorum. +var errWriteQuorum = errors.New("I/O error. did not meet write quorum.") diff --git a/web-handlers.go b/web-handlers.go index c24cd09c5..1b297961e 100644 --- a/web-handlers.go +++ b/web-handlers.go @@ -413,6 +413,10 @@ func writeWebErrorResponse(w http.ResponseWriter, err error) { apiErrCode = ErrNoSuchKey case ObjectNameInvalid: apiErrCode = ErrNoSuchKey + case StorageInsufficientWriteResources: + apiErrCode = ErrInsufficientWriteResources + case StorageInsufficientReadResources: + apiErrCode = ErrInsufficientReadResources default: apiErrCode = ErrInternalError } diff --git a/xl-v1-createfile.go b/xl-v1-createfile.go index 01126c936..b95402e65 100644 --- a/xl-v1-createfile.go +++ b/xl-v1-createfile.go @@ -148,7 +148,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { // Remove previous temp writers for any failure. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) - reader.CloseWithError(err) + reader.CloseWithError(errWriteQuorum) return } @@ -166,7 +166,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { // Remove previous temp writers for any failure. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) - reader.CloseWithError(err) + reader.CloseWithError(errWriteQuorum) return }