XL: Rename, cleanup and add more comments. (#1769)

- xl-v1-bucket.go - removes a whole bunch of code.
- {xl-v1,fs-v1}-metadata.go - add a lot of comments and rename functions
   appropriately.
master
Harshavardhana 9 years ago committed by Harshavardhana
parent 553fdb9211
commit b2293c2bf4
  1. 0
      docs/backend/README.md
  2. 0
      docs/backend/fs/format.json
  3. 0
      docs/backend/fs/fs.json
  4. 0
      docs/backend/fs/uploads.json
  5. 0
      docs/backend/xl/format.json
  6. 0
      docs/backend/xl/uploads.json
  7. 0
      docs/backend/xl/xl.json
  8. 9
      erasure.go
  9. 51
      fs-v1-metadata.go
  10. 9
      fs-v1-multipart.go
  11. 7
      httprange.go
  12. 2
      object-handlers.go
  13. 5
      tree-walk-xl.go
  14. 242
      xl-v1-bucket.go
  15. 97
      xl-v1-metadata.go
  16. 7
      xl-v1-multipart-common.go
  17. 37
      xl-v1-multipart.go
  18. 27
      xl-v1-object.go
  19. 4
      xl-v1-utils.go

@ -16,11 +16,7 @@
package main
import (
"errors"
"github.com/klauspost/reedsolomon"
)
import "github.com/klauspost/reedsolomon"
// erasure storage layer.
type erasure struct {
@ -30,9 +26,6 @@ type erasure struct {
storageDisks []StorageAPI
}
// errUnexpected - returned for any unexpected error.
var errUnexpected = errors.New("Unexpected error - please report at https://github.com/minio/minio/issues")
// newErasure instantiate a new erasure.
func newErasure(disks []StorageAPI) *erasure {
// Initialize E.

@ -44,28 +44,42 @@ func (m fsMetaV1) WriteTo(writer io.Writer) (n int64, err error) {
return int64(p), err
}
// SearchObjectPart - search object part name and etag.
func (m fsMetaV1) SearchObjectPart(number int) int {
// ObjectPartIndex - returns the index of matching object part number.
func (m fsMetaV1) ObjectPartIndex(partNumber int) (partIndex int) {
for i, part := range m.Parts {
if number == part.Number {
return i
if partNumber == part.Number {
partIndex = i
return partIndex
}
}
return -1
}
// AddObjectPart - add a new object part in order.
func (m *fsMetaV1) AddObjectPart(number int, name string, etag string, size int64) {
m.Parts = append(m.Parts, objectPartInfo{
Number: number,
Name: name,
ETag: etag,
Size: size,
})
func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64) {
partInfo := objectPartInfo{
Number: partNumber,
Name: partName,
ETag: partETag,
Size: partSize,
}
// Update part info if it already exists.
for i, part := range m.Parts {
if partNumber == part.Number {
m.Parts[i] = partInfo
return
}
}
// Proceed to include new part info.
m.Parts = append(m.Parts, partInfo)
// Parts in fsMeta should be in sorted order by part number.
sort.Sort(byPartNumber(m.Parts))
}
// readFSMetadata - read `fs.json`.
// readFSMetadata - returns the object metadata `fs.json` content.
func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) {
r, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0))
if err != nil {
@ -79,10 +93,17 @@ func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err
return fsMeta, nil
}
// writeFSMetadata - write `fs.json`.
func (fs fsObjects) writeFSMetadata(bucket, prefix string, fsMeta fsMetaV1) error {
// Initialize metadata map, save all erasure related metadata.
// newFSMetaV1 - initializes new fsMetaV1.
func newFSMetaV1() (fsMeta fsMetaV1) {
fsMeta = fsMetaV1{}
fsMeta.Version = "1"
fsMeta.Format = "fs"
fsMeta.Minio.Release = minioReleaseTag
return fsMeta
}
// writeFSMetadata - writes `fs.json` metadata.
func (fs fsObjects) writeFSMetadata(bucket, prefix string, fsMeta fsMetaV1) error {
w, err := fs.storage.CreateFile(bucket, path.Join(prefix, fsMetaJSONFile))
if err != nil {
return err

@ -63,9 +63,8 @@ func (fs fsObjects) newMultipartUploadCommon(bucket string, object string, meta
meta = make(map[string]string)
}
fsMeta := fsMetaV1{}
fsMeta.Format = "fs"
fsMeta.Version = "1"
// Initialize `fs.json` values.
fsMeta := newFSMetaV1()
// This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/"
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
@ -454,7 +453,7 @@ func (fs fsObjects) listObjectPartsCommon(bucket, object, uploadID string, partN
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Only parts with higher part numbers will be listed.
partIdx := fsMeta.SearchObjectPart(partNumberMarker)
partIdx := fsMeta.ObjectPartIndex(partNumberMarker)
parts := fsMeta.Parts
if partIdx != -1 {
parts = fsMeta.Parts[partIdx+1:]
@ -642,7 +641,7 @@ func (fs fsObjects) abortMultipartUploadCommon(bucket, object, uploadID string)
// the object, if yes do not attempt to delete 'uploads.json'.
uploadIDs, err := getUploadIDs(bucket, object, fs.storage)
if err == nil {
uploadIDIdx := uploadIDs.SearchUploadID(uploadID)
uploadIDIdx := uploadIDs.Index(uploadID)
if uploadIDIdx != -1 {
uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...)
}

@ -28,13 +28,10 @@ const (
)
// InvalidRange - invalid range
type InvalidRange struct {
Start int64
Length int64
}
type InvalidRange struct{}
func (e InvalidRange) Error() string {
return fmt.Sprintf("Invalid range start:%d length:%d", e.Start, e.Length)
return "The requested range is not satisfiable"
}
// HttpRange specifies the byte range to be sent to the client.

@ -136,7 +136,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(w, r, apiErr, r.URL.Path)
return
}
defer readCloser.Close() // Close after this handler returns.
defer readCloser.Close()
// Set standard object headers.
setObjectHeaders(w, objInfo, hrange)

@ -54,9 +54,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string)
// Return the first success entry based on the selected random disk.
for listErrCount < len(xl.storageDisks) {
// Choose a random disk on each attempt, do not hit the same disk all the time.
randIndex := rand.Intn(len(xl.storageDisks) - 1)
disk := xl.storageDisks[randIndex] // Pick a random disk.
disk := xl.getRandomDisk() // Choose a random disk on each attempt.
if entries, err = disk.ListDir(bucket, prefixDir); err == nil {
// Skip the entries which do not match the filter.
for i, entry := range entries {
@ -85,6 +83,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string)
// getRandomDisk - gives a random disk at any point in time from the
// available pool of disks.
func (xl xlObjects) getRandomDisk() (disk StorageAPI) {
rand.Seed(time.Now().UTC().UnixNano())
randIndex := rand.Intn(len(xl.storageDisks) - 1)
disk = xl.storageDisks[randIndex] // Pick a random disk.
return disk

@ -64,234 +64,102 @@ func (xl xlObjects) MakeBucket(bucket string) error {
if volumeExistsErrCnt == len(xl.storageDisks) {
return toObjectErr(errVolumeExists, bucket)
} else if createVolErr > len(xl.storageDisks)-xl.writeQuorum {
// Return errWriteQuorum if errors were more than
// allowed write quorum.
// Return errWriteQuorum if errors were more than allowed write quorum.
return toObjectErr(errWriteQuorum, bucket)
}
return nil
}
// getAllBucketInfo - list bucket info from all disks.
// Returns error slice indicating the failed volume stat operations.
func (xl xlObjects) getAllBucketInfo(bucketName string) ([]BucketInfo, []error) {
// Create errs and volInfo slices of storageDisks size.
var errs = make([]error, len(xl.storageDisks))
var volsInfo = make([]VolInfo, len(xl.storageDisks))
// getBucketInfo - returns the BucketInfo from one of the disks picked
// at random.
func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) {
// Count for errors encountered.
var bucketErrCount = 0
// Allocate a new waitgroup.
var wg = &sync.WaitGroup{}
for index, disk := range xl.storageDisks {
wg.Add(1)
// Stat volume on all the disks in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
volInfo, err := disk.StatVol(bucketName)
if err != nil {
errs[index] = err
return
}
volsInfo[index] = volInfo
errs[index] = nil
}(index, disk)
}
// Wait for all the Stat operations to finish.
wg.Wait()
// Return the concocted values.
var bucketsInfo = make([]BucketInfo, len(xl.storageDisks))
for _, volInfo := range volsInfo {
if IsValidBucketName(volInfo.Name) {
bucketsInfo = append(bucketsInfo, BucketInfo{
// Return the first successful lookup from a random list of disks.
for bucketErrCount < len(xl.storageDisks) {
disk := xl.getRandomDisk() // Choose a random disk on each attempt.
var volInfo VolInfo
volInfo, err = disk.StatVol(bucketName)
if err == nil {
bucketInfo = BucketInfo{
Name: volInfo.Name,
Created: volInfo.Created,
})
}
}
return bucketsInfo, errs
}
// listAllBucketInfo - list all stat volume info from all disks.
// Returns
// - stat volume info for all online disks.
// - boolean to indicate if healing is necessary.
// - error if any.
func (xl xlObjects) listAllBucketInfo(bucketName string) ([]BucketInfo, bool, error) {
bucketsInfo, errs := xl.getAllBucketInfo(bucketName)
notFoundCount := 0
for _, err := range errs {
if err == errVolumeNotFound {
notFoundCount++
// If we have errors with file not found greater than allowed read
// quorum we return err as errFileNotFound.
if notFoundCount > len(xl.storageDisks)-xl.readQuorum {
return nil, false, errVolumeNotFound
}
return bucketInfo, nil
}
bucketErrCount++ // Update error count.
}
// Calculate online disk count.
onlineDiskCount := 0
for index := range errs {
if errs[index] == nil {
onlineDiskCount++
}
}
var heal bool
// If online disks count is lesser than configured disks, most
// probably we need to heal the file, additionally verify if the
// count is lesser than readQuorum, if not we throw an error.
if onlineDiskCount < len(xl.storageDisks) {
// Online disks lesser than total storage disks, needs to be
// healed. unless we do not have readQuorum.
heal = true
// Verify if online disks count are lesser than readQuorum
// threshold, return an error if yes.
if onlineDiskCount < xl.readQuorum {
return nil, false, errReadQuorum
}
}
// Return success.
return bucketsInfo, heal, nil
return BucketInfo{}, err
}
// Checks whether bucket exists.
func (xl xlObjects) isBucketExist(bucketName string) bool {
func (xl xlObjects) isBucketExist(bucket string) bool {
nsMutex.RLock(bucket, "")
defer nsMutex.RUnlock(bucket, "")
// Check whether bucket exists.
_, _, err := xl.listAllBucketInfo(bucketName)
_, err := xl.getBucketInfo(bucket)
if err != nil {
if err == errVolumeNotFound {
return false
}
errorIf(err, "Stat failed on bucket "+bucketName+".")
errorIf(err, "Stat failed on bucket "+bucket+".")
return false
}
return true
}
// GetBucketInfo - get bucket info.
// GetBucketInfo - returns BucketInfo for a bucket.
func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketInfo{}, BucketNameInvalid{Bucket: bucket}
}
nsMutex.RLock(bucket, "")
defer nsMutex.RUnlock(bucket, "")
// List and figured out if we need healing.
bucketsInfo, heal, err := xl.listAllBucketInfo(bucket)
bucketInfo, err := xl.getBucketInfo(bucket)
if err != nil {
return BucketInfo{}, toObjectErr(err, bucket)
}
// Heal for missing entries.
if heal {
go func() {
// Create bucket if missing on disks.
for index, bktInfo := range bucketsInfo {
if bktInfo.Name != "" {
continue
}
// Bucketinfo name would be an empty string, create it.
xl.storageDisks[index].MakeVol(bucket)
}
}()
}
// From all bucketsInfo, calculate the actual usage values.
var total, free int64
var bucketInfo BucketInfo
for _, bucketInfo = range bucketsInfo {
if bucketInfo.Name == "" {
continue
}
free += bucketInfo.Free
total += bucketInfo.Total
}
// Update the aggregated values.
bucketInfo.Free = free
bucketInfo.Total = total
return BucketInfo{
Name: bucket,
Created: bucketInfo.Created,
Total: bucketInfo.Total,
Free: bucketInfo.Free,
}, nil
return bucketInfo, nil
}
func (xl xlObjects) listBuckets() ([]BucketInfo, error) {
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
// Success vols map carries successful results of ListVols from each disks.
var successVols = make([][]VolInfo, len(xl.storageDisks))
for index, disk := range xl.storageDisks {
wg.Add(1) // Add each go-routine to wait for.
go func(index int, disk StorageAPI) {
// Indicate wait group as finished.
defer wg.Done()
// listBuckets - returns list of all buckets from a disk picked at random.
func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) {
// Count for errors encountered.
var listBucketsErrCount = 0
// Initiate listing.
volsInfo, _ := disk.ListVols()
successVols[index] = volsInfo
}(index, disk)
}
// Wait for all the list volumes running in parallel to finish.
wg.Wait()
// From success vols map calculate aggregated usage values.
var volsInfo []VolInfo
var total, free int64
for _, volsInfo = range successVols {
var volInfo VolInfo
for _, volInfo = range volsInfo {
if volInfo.Name == "" {
continue
}
if !IsValidBucketName(volInfo.Name) {
continue
// Return the first successful lookup from a random list of disks.
for listBucketsErrCount < len(xl.storageDisks) {
disk := xl.getRandomDisk() // Choose a random disk on each attempt.
var volsInfo []VolInfo
volsInfo, err = disk.ListVols()
if err == nil {
// NOTE: The assumption here is that volumes across all disks in
// readQuorum have consistent view i.e they all have same number
// of buckets. This is essentially not verified since healing
// should take care of this.
var bucketsInfo []BucketInfo
for _, volInfo := range volsInfo {
// StorageAPI can send volume names which are incompatible
// with buckets, handle it and skip them.
if !IsValidBucketName(volInfo.Name) {
continue
}
bucketsInfo = append(bucketsInfo, BucketInfo{
Name: volInfo.Name,
Created: volInfo.Created,
})
}
break
}
free += volInfo.Free
total += volInfo.Total
}
// Save the updated usage values back into the vols.
for index, volInfo := range volsInfo {
volInfo.Free = free
volInfo.Total = total
volsInfo[index] = volInfo
}
// NOTE: The assumption here is that volumes across all disks in
// readQuorum have consistent view i.e they all have same number
// of buckets. This is essentially not verified since healing
// should take care of this.
var bucketsInfo []BucketInfo
for _, volInfo := range volsInfo {
// StorageAPI can send volume names which are incompatible
// with buckets, handle it and skip them.
if !IsValidBucketName(volInfo.Name) {
continue
return bucketsInfo, nil
}
bucketsInfo = append(bucketsInfo, BucketInfo{
Name: volInfo.Name,
Created: volInfo.Created,
Total: volInfo.Total,
Free: volInfo.Free,
})
listBucketsErrCount++ // Update error count.
}
return bucketsInfo, nil
return nil, err
}
// ListBuckets - list buckets.
// ListBuckets - lists all the buckets, sorted by its name.
func (xl xlObjects) ListBuckets() ([]BucketInfo, error) {
bucketInfos, err := xl.listBuckets()
if err != nil {
@ -302,7 +170,7 @@ func (xl xlObjects) ListBuckets() ([]BucketInfo, error) {
return bucketInfos, nil
}
// DeleteBucket - delete a bucket.
// DeleteBucket - deletes a bucket.
func (xl xlObjects) DeleteBucket(bucket string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {

@ -100,63 +100,73 @@ func (t byPartNumber) Len() int { return len(t) }
func (t byPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t byPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number }
// SearchObjectPart - searches for part name and etag, returns the
// index if found.
func (m xlMetaV1) SearchObjectPart(number int) int {
// ObjectPartIndex - returns the index of matching object part number.
func (m xlMetaV1) ObjectPartIndex(partNumber int) (index int) {
for i, part := range m.Parts {
if number == part.Number {
return i
if partNumber == part.Number {
index = i
return index
}
}
return -1
}
// AddObjectPart - add a new object part in order.
func (m *xlMetaV1) AddObjectPart(number int, name string, etag string, size int64) {
func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64) {
partInfo := objectPartInfo{
Number: number,
Name: name,
ETag: etag,
Size: size,
Number: partNumber,
Name: partName,
ETag: partETag,
Size: partSize,
}
// Update part info if it already exists.
for i, part := range m.Parts {
if number == part.Number {
if partNumber == part.Number {
m.Parts[i] = partInfo
return
}
}
// Proceed to include new part info.
m.Parts = append(m.Parts, partInfo)
// Parts in xlMeta should be in sorted order by part number.
sort.Sort(byPartNumber(m.Parts))
}
// getPartIndexOffset - given an offset for the whole object, return the part and offset in that part.
func (m xlMetaV1) getPartIndexOffset(offset int64) (partIndex int, partOffset int64, err error) {
// objectToPartOffset - translate offset of an object to offset of its individual part.
func (m xlMetaV1) objectToPartOffset(offset int64) (partIndex int, partOffset int64, err error) {
partOffset = offset
// Seek until object offset maps to a particular part offset.
for i, part := range m.Parts {
partIndex = i
// Last part can be of '0' bytes, treat it specially and
// return right here.
if part.Size == 0 {
return partIndex, partOffset, nil
}
// Offset is smaller than size we have reached the proper part offset.
if partOffset < part.Size {
return partIndex, partOffset, nil
}
// Continue to towards the next part.
partOffset -= part.Size
}
// Offset beyond the size of the object
err = errUnexpected
return 0, 0, err
// Offset beyond the size of the object return InvalidRange.
return 0, 0, InvalidRange{}
}
// readXLMetadata - read xl metadata.
// readXLMetadata - returns the object metadata `xl.json` content from
// one of the disks picked at random.
func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) {
// Count for errors encountered.
var xlJSONErrCount = 0
// Return the first success entry based on the selected random disk.
// Return the first successful lookup from a random list of disks.
for xlJSONErrCount < len(xl.storageDisks) {
var r io.ReadCloser
// Choose a random disk on each attempt, do not hit the same disk all the time.
disk := xl.getRandomDisk() // Pick a random disk.
disk := xl.getRandomDisk() // Choose a random disk on each attempt.
r, err = disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), int64(0))
if err == nil {
defer r.Close()
@ -170,23 +180,29 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
return xlMetaV1{}, err
}
// writeXLJson - write `xl.json` on all disks in order.
func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error {
var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(xl.storageDisks))
// Initialize metadata map, save all erasure related metadata.
// newXLMetaV1 - initializes new xlMetaV1.
func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) {
xlMeta = xlMetaV1{}
xlMeta.Version = "1"
xlMeta.Format = "xl"
xlMeta.Minio.Release = minioReleaseTag
xlMeta.Erasure.Algorithm = erasureAlgorithmKlauspost
xlMeta.Erasure.DataBlocks = xl.dataBlocks
xlMeta.Erasure.ParityBlocks = xl.parityBlocks
xlMeta.Erasure.DataBlocks = dataBlocks
xlMeta.Erasure.ParityBlocks = parityBlocks
xlMeta.Erasure.BlockSize = erasureBlockSize
xlMeta.Erasure.Distribution = xl.getDiskDistribution()
xlMeta.Erasure.Distribution = randErasureDistribution(dataBlocks + parityBlocks)
return xlMeta
}
// writeXLMetadata - write `xl.json` on all disks in order.
func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error {
var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(xl.storageDisks))
// Start writing `xl.json` to all disks in parallel.
for index, disk := range xl.storageDisks {
wg.Add(1)
// Write `xl.json` in a routine.
go func(index int, disk StorageAPI, metadata xlMetaV1) {
defer wg.Done()
@ -197,8 +213,10 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
return
}
// Save the order.
// Save the disk order index.
metadata.Erasure.Index = index + 1
// Marshal metadata to the writer.
_, mErr = metadata.WriteTo(metaWriter)
if mErr != nil {
if mErr = safeCloseAndRemove(metaWriter); mErr != nil {
@ -208,6 +226,7 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
mErrs[index] = mErr
return
}
// Verify if close fails with an error.
if mErr = metaWriter.Close(); mErr != nil {
if mErr = safeCloseAndRemove(metaWriter); mErr != nil {
mErrs[index] = mErr
@ -223,7 +242,6 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
// Wait for all the routines.
wg.Wait()
// FIXME: check for quorum.
// Return the first error.
for _, err := range mErrs {
if err == nil {
@ -234,11 +252,18 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
return nil
}
// getDiskDistribution - get disk distribution.
func (xl xlObjects) getDiskDistribution() []int {
var distribution = make([]int, len(xl.storageDisks))
for index := range xl.storageDisks {
distribution[index] = index + 1
// randErasureDistribution - uses Knuth Fisher-Yates shuffle algorithm.
func randErasureDistribution(numBlocks int) []int {
distribution := make([]int, numBlocks)
for i := 0; i < numBlocks; i++ {
distribution[i] = i + 1
}
/*
for i := 0; i < numBlocks; i++ {
// Choose index uniformly in [i, numBlocks-1]
r := i + rand.Intn(numBlocks-i)
distribution[r], distribution[i] = distribution[i], distribution[r]
}
*/
return distribution
}

@ -60,7 +60,8 @@ func (u *uploadsV1) AddUploadID(uploadID string, initiated time.Time) {
sort.Sort(byInitiatedTime(u.Uploads))
}
func (u uploadsV1) SearchUploadID(uploadID string) int {
// Index - returns the index of matching the upload id.
func (u uploadsV1) Index(uploadID string) int {
for i, u := range u.Uploads {
if u.UploadID == uploadID {
return i
@ -90,7 +91,7 @@ func (u uploadsV1) WriteTo(writer io.Writer) (n int64, err error) {
return int64(m), err
}
// getUploadIDs - get saved upload id's.
// getUploadIDs - get all the saved upload id's.
func getUploadIDs(bucket, object string, storageDisks ...StorageAPI) (uploadIDs uploadsV1, err error) {
uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
var errs = make([]error, len(storageDisks))
@ -258,7 +259,7 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora
// listUploadsInfo - list all uploads info.
func (xl xlObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) {
disk := xl.getRandomDisk()
disk := xl.getRandomDisk() // Choose a random disk on each attempt.
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
uploadIDs, err := getUploadIDs(splitPrefixes[1], splitPrefixes[2], disk)
if err != nil {

@ -57,7 +57,7 @@ func (xl xlObjects) newMultipartUploadCommon(bucket string, object string, meta
meta = make(map[string]string)
}
xlMeta := xlMetaV1{}
xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks)
// If not set default to "application/octet-stream"
if meta["content-type"] == "" {
contentType := "application/octet-stream"
@ -125,11 +125,18 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// List all online disks.
onlineDisks, higherVersion, err := xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Increment version only if we have online disks less than configured storage disks.
if diskCount(onlineDisks) < len(xl.storageDisks) {
higherVersion++
}
@ -193,21 +200,18 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
return "", err
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
xlMeta.Stat.Version = higherVersion
xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, partPath)
}
if err = xl.writeXLMetadata(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID), xlMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID))
// Once part is successfully committed, proceed with updating XL metadata.
xlMeta.Stat.Version = higherVersion
xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
if err = xl.writeXLMetadata(minioMetaBucket, uploadIDPath, xlMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
return newMD5Hex, nil
}
@ -261,7 +265,7 @@ func (xl xlObjects) listObjectPartsCommon(bucket, object, uploadID string, partN
}
// Only parts with higher part numbers will be listed.
partIdx := xlMeta.SearchObjectPart(partNumberMarker)
partIdx := xlMeta.ObjectPartIndex(partNumberMarker)
parts := xlMeta.Parts
if partIdx != -1 {
parts = xlMeta.Parts[partIdx+1:]
@ -349,7 +353,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Loop through all parts, validate them and then commit to disk.
for i, part := range parts {
partIdx := currentXLMeta.SearchObjectPart(part.PartNumber)
partIdx := currentXLMeta.ObjectPartIndex(part.PartNumber)
if partIdx == -1 {
return "", InvalidPart{}
}
@ -414,7 +418,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// the object, if yes do not attempt to delete 'uploads.json'.
uploadIDs, err := getUploadIDs(bucket, object, xl.storageDisks...)
if err == nil {
uploadIDIdx := uploadIDs.SearchUploadID(uploadID)
uploadIDIdx := uploadIDs.Index(uploadID)
if uploadIDIdx != -1 {
uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...)
}
@ -435,8 +439,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
return s3MD5, nil
}
// abortMultipartUploadCommon - aborts a multipart upload, common
// function used by both object layers.
// abortMultipartUploadCommon - aborts a multipart upload, common function used by both object layers.
func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
@ -465,7 +468,7 @@ func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string)
// the object, if yes do not attempt to delete 'uploads.json'.
uploadIDs, err := getUploadIDs(bucket, object, xl.storageDisks...)
if err == nil {
uploadIDIdx := uploadIDs.SearchUploadID(uploadID)
uploadIDIdx := uploadIDs.Index(uploadID)
if uploadIDIdx != -1 {
uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...)
}

@ -13,6 +13,12 @@ import (
"github.com/minio/minio/pkg/mimedb"
)
// nullReadCloser - returns 0 bytes and io.EOF upon first read attempt.
type nullReadCloser struct{}
func (n nullReadCloser) Read([]byte) (int, error) { return 0, io.EOF }
func (n nullReadCloser) Close() error { return nil }
/// Object Operations
// GetObject - get an object.
@ -35,16 +41,19 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
if err != nil {
return nil, toObjectErr(err, bucket, object)
}
// List all online disks.
onlineDisks, _, err := xl.listOnlineDisks(bucket, object)
if err != nil {
return nil, toObjectErr(err, bucket, object)
}
// For zero byte files, return a null reader.
if xlMeta.Stat.Size == 0 {
return nullReadCloser{}, nil
}
erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks
// Get part index offset.
partIndex, offset, err := xlMeta.getPartIndexOffset(startOffset)
partIndex, partOffset, err := xlMeta.objectToPartOffset(startOffset)
if err != nil {
return nil, toObjectErr(err, bucket, object)
}
@ -59,13 +68,14 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
defer nsMutex.RUnlock(bucket, object)
for ; partIndex < len(xlMeta.Parts); partIndex++ {
part := xlMeta.Parts[partIndex]
r, err := erasure.ReadFile(bucket, pathJoin(object, part.Name), offset, part.Size)
r, err := erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size)
if err != nil {
fileWriter.CloseWithError(toObjectErr(err, bucket, object))
return
}
// Reset offset to 0 as it would be non-0 only for the first loop if startOffset is non-0.
offset = 0
// Reset part offset to 0 to read rest of the parts from
// the beginning.
partOffset = 0
if _, err = io.Copy(fileWriter, r); err != nil {
switch reader := r.(type) {
case *io.PipeReader:
@ -76,7 +86,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
fileWriter.CloseWithError(toObjectErr(err, bucket, object))
return
}
// Close the readerCloser that reads multiparts of an object from the xl storage layer.
// Close the readerCloser that reads multiparts of an object.
// Not closing leaks underlying file descriptors.
r.Close()
}
@ -198,12 +208,13 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object1")
tempObj := path.Join(tmpMetaPrefix, bucket, object)
// List all online disks.
onlineDisks, higherVersion, err := xl.listOnlineDisks(bucket, object)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Increment version only if we have online disks less than configured storage disks.
if diskCount(onlineDisks) < len(xl.storageDisks) {
// Increment version only if we have online disks less than configured storage disks.
higherVersion++
}
erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks
@ -290,7 +301,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
return "", toObjectErr(err, bucket, object)
}
xlMeta := xlMetaV1{}
xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks)
xlMeta.Meta = metadata
xlMeta.Stat.Size = size
xlMeta.Stat.ModTime = modTime

@ -73,8 +73,8 @@ func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err
// Return the first success entry based on the selected random disk.
for xlJSONErrCount < len(xl.storageDisks) {
// Choose a random disk on each attempt, do not hit the same disk all the time.
disk := xl.getRandomDisk() // Pick a random disk.
// Choose a random disk on each attempt.
disk := xl.getRandomDisk()
fileInfo, err = disk.StatFile(bucket, objectPart)
if err == nil {
return fileInfo, nil

Loading…
Cancel
Save