Merge pull request #642 from harshavardhana/multipartfixes

fs multipart fixes
master
Harshavardhana 10 years ago
commit 765fb08e20
  1. 60
      pkg/storage/drivers/fs/fs_multipart.go
  2. 8
      pkg/storage/drivers/fs/fs_object.go

@ -5,8 +5,8 @@ import (
"crypto/md5" "crypto/md5"
"crypto/sha512" "crypto/sha512"
"encoding/base64" "encoding/base64"
"encoding/gob"
"encoding/hex" "encoding/hex"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -73,7 +73,7 @@ func (fs *fsDriver) ListMultipartUploads(bucket string, resources drivers.Bucket
defer activeSessionFile.Close() defer activeSessionFile.Close()
var deserializedActiveSession map[string]*MultipartSession var deserializedActiveSession map[string]*MultipartSession
decoder := gob.NewDecoder(activeSessionFile) decoder := json.NewDecoder(activeSessionFile)
err = decoder.Decode(&deserializedActiveSession) err = decoder.Decode(&deserializedActiveSession)
if err != nil { if err != nil {
return drivers.BucketMultipartResourcesMetadata{}, iodine.New(err, nil) return drivers.BucketMultipartResourcesMetadata{}, iodine.New(err, nil)
@ -142,21 +142,6 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string,
if err != nil { if err != nil {
return "", iodine.New(drivers.InternalError{}, nil) return "", iodine.New(drivers.InternalError{}, nil)
} }
var activeSessionFile *os.File
if _, err := os.Stat(bucketPath + "$activeSession"); os.IsNotExist(err) {
activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return "", iodine.New(err, nil)
}
} else {
activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY, 0600)
if err != nil {
return "", iodine.New(err, nil)
}
}
defer activeSessionFile.Close()
objectPath := path.Join(bucketPath, key) objectPath := path.Join(bucketPath, key)
objectDir := path.Dir(objectPath) objectDir := path.Dir(objectPath)
if _, err := os.Stat(objectDir); os.IsNotExist(err) { if _, err := os.Stat(objectDir); os.IsNotExist(err) {
@ -174,6 +159,20 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string,
}, nil) }, nil)
} }
var activeSessionFile *os.File
if _, err := os.Stat(bucketPath + "$activeSession"); os.IsNotExist(err) {
activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return "", iodine.New(err, nil)
}
} else {
activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY, 0600)
if err != nil {
return "", iodine.New(err, nil)
}
}
defer activeSessionFile.Close()
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
uploadIDSum := sha512.Sum512(id) uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
@ -192,12 +191,12 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string,
mpartSession.Parts = parts mpartSession.Parts = parts
fs.multiparts.ActiveSession[key] = mpartSession fs.multiparts.ActiveSession[key] = mpartSession
encoder := gob.NewEncoder(file) encoder := json.NewEncoder(file)
err = encoder.Encode(mpartSession) err = encoder.Encode(mpartSession)
if err != nil { if err != nil {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
encoder = gob.NewEncoder(activeSessionFile) encoder = json.NewEncoder(activeSessionFile)
err = encoder.Encode(fs.multiparts.ActiveSession) err = encoder.Encode(fs.multiparts.ActiveSession)
if err != nil { if err != nil {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
@ -329,17 +328,17 @@ func (fs *fsDriver) CreateObjectPart(bucket, key, uploadID string, partID int, c
defer multiPartfile.Close() defer multiPartfile.Close()
var deserializedMultipartSession MultipartSession var deserializedMultipartSession MultipartSession
decoder := gob.NewDecoder(multiPartfile) decoder := json.NewDecoder(multiPartfile)
err = decoder.Decode(&deserializedMultipartSession) err = decoder.Decode(&deserializedMultipartSession)
if err != nil { if err != nil {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata) deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata)
deserializedMultipartSession.TotalParts++ deserializedMultipartSession.TotalParts++
fs.multiparts.ActiveSession[uploadID] = &deserializedMultipartSession fs.multiparts.ActiveSession[key] = &deserializedMultipartSession
sort.Sort(partNumber(deserializedMultipartSession.Parts)) sort.Sort(partNumber(deserializedMultipartSession.Parts))
encoder := gob.NewEncoder(multiPartfile) encoder := json.NewEncoder(multiPartfile)
err = encoder.Encode(&deserializedMultipartSession) err = encoder.Encode(&deserializedMultipartSession)
if err != nil { if err != nil {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
@ -425,7 +424,7 @@ func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts
} }
md5sum := hex.EncodeToString(h.Sum(nil)) md5sum := hex.EncodeToString(h.Sum(nil))
delete(fs.multiparts.ActiveSession, uploadID) delete(fs.multiparts.ActiveSession, key)
for partNumber := range parts { for partNumber := range parts {
err = os.Remove(objectPath + fmt.Sprintf("$%d", partNumber)) err = os.Remove(objectPath + fmt.Sprintf("$%d", partNumber))
if err != nil { if err != nil {
@ -447,19 +446,20 @@ func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts
ContentType: "application/octet-stream", ContentType: "application/octet-stream",
Md5sum: h.Sum(nil), Md5sum: h.Sum(nil),
} }
// serialize metadata to gob // serialize metadata to json
encoder := gob.NewEncoder(file) encoder := json.NewEncoder(file)
err = encoder.Encode(metadata) err = encoder.Encode(metadata)
if err != nil { if err != nil {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
activeSessionFile, err := os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY, 0600) activeSessionFile, err := os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil { if err != nil {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
defer activeSessionFile.Close() defer activeSessionFile.Close()
encoder = gob.NewEncoder(activeSessionFile) fmt.Println(fs.multiparts.ActiveSession)
encoder = json.NewEncoder(activeSessionFile)
err = encoder.Encode(fs.multiparts.ActiveSession) err = encoder.Encode(fs.multiparts.ActiveSession)
if err != nil { if err != nil {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
@ -514,7 +514,7 @@ func (fs *fsDriver) ListObjectParts(bucket, key string, resources drivers.Object
defer multiPartfile.Close() defer multiPartfile.Close()
var deserializedMultipartSession MultipartSession var deserializedMultipartSession MultipartSession
decoder := gob.NewDecoder(multiPartfile) decoder := json.NewDecoder(multiPartfile)
err = decoder.Decode(&deserializedMultipartSession) err = decoder.Decode(&deserializedMultipartSession)
if err != nil { if err != nil {
return drivers.ObjectResourcesMetadata{}, iodine.New(err, nil) return drivers.ObjectResourcesMetadata{}, iodine.New(err, nil)
@ -570,14 +570,14 @@ func (fs *fsDriver) AbortMultipartUpload(bucket, key, uploadID string) error {
} }
var deserializedMultipartSession MultipartSession var deserializedMultipartSession MultipartSession
decoder := gob.NewDecoder(multiPartfile) decoder := json.NewDecoder(multiPartfile)
err = decoder.Decode(&deserializedMultipartSession) err = decoder.Decode(&deserializedMultipartSession)
if err != nil { if err != nil {
return iodine.New(err, nil) return iodine.New(err, nil)
} }
multiPartfile.Close() // close it right here, since we will delete it subsequently multiPartfile.Close() // close it right here, since we will delete it subsequently
delete(fs.multiparts.ActiveSession, uploadID) delete(fs.multiparts.ActiveSession, key)
for _, part := range deserializedMultipartSession.Parts { for _, part := range deserializedMultipartSession.Parts {
err = os.RemoveAll(objectPath + fmt.Sprintf("$%d", part.PartNumber)) err = os.RemoveAll(objectPath + fmt.Sprintf("$%d", part.PartNumber))
if err != nil { if err != nil {

@ -25,8 +25,8 @@ import (
"crypto/md5" "crypto/md5"
"encoding/base64" "encoding/base64"
"encoding/gob"
"encoding/hex" "encoding/hex"
"encoding/json"
"errors" "errors"
"github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/iodine"
@ -154,7 +154,7 @@ func (fs *fsDriver) GetObjectMetadata(bucket, object string) (drivers.ObjectMeta
} }
var deserializedMetadata Metadata var deserializedMetadata Metadata
decoder := gob.NewDecoder(file) decoder := json.NewDecoder(file)
err = decoder.Decode(&deserializedMetadata) err = decoder.Decode(&deserializedMetadata)
if err != nil { if err != nil {
return drivers.ObjectMetadata{}, iodine.New(err, nil) return drivers.ObjectMetadata{}, iodine.New(err, nil)
@ -280,8 +280,8 @@ func (fs *fsDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string
ContentType: contentType, ContentType: contentType,
Md5sum: h.Sum(nil), Md5sum: h.Sum(nil),
} }
// serialize metadata to gob // serialize metadata to json
encoder := gob.NewEncoder(file) encoder := json.NewEncoder(file)
err = encoder.Encode(metadata) err = encoder.Encode(metadata)
md5Sum := hex.EncodeToString(metadata.Md5sum) md5Sum := hex.EncodeToString(metadata.Md5sum)

Loading…
Cancel
Save