Merge pull request #691 from harshavardhana/pr_out_handle_couple_of_cases_of_oom_conditions_move_caching_to_getobject_rather_than_putobject_

Handle couple of cases of OOM conditions, move caching to GetObject() rather than PutObject()
master
Harshavardhana 10 years ago
commit b5a5861c8f
  1. 51
      pkg/storage/donut/bucket.go
  2. 40
      pkg/storage/drivers/donut/donut.go
  3. 13
      pkg/storage/drivers/donut/multipart.go

@ -212,7 +212,7 @@ func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64,
return nil, 0, iodine.New(err, nil) return nil, 0, iodine.New(err, nil)
} }
// read and reply back to GetObject() request in a go-routine // read and reply back to GetObject() request in a go-routine
go b.readEncodedData(normalizeObjectName(objectName), writer, objMetadata) go b.readObjectData(normalizeObjectName(objectName), writer, objMetadata)
return reader, objMetadata.Size, nil return reader, objMetadata.Size, nil
} }
@ -223,7 +223,7 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
if objectName == "" || objectData == nil { if objectName == "" || objectData == nil {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil) return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
} }
writers, err := b.getDiskWriters(normalizeObjectName(objectName), "data") writers, err := b.getWriters(normalizeObjectName(objectName), "data")
if err != nil { if err != nil {
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
@ -247,8 +247,8 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
if err != nil { if err != nil {
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
// encoded data with k, m and write // write encoded data with k, m and writers
chunkCount, totalLength, err := b.writeEncodedData(k, m, writers, objectData, sumMD5, sum512) chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, sumMD5, sum512)
if err != nil { if err != nil {
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
@ -310,7 +310,7 @@ func (b bucket) writeObjectMetadata(objectName string, objMetadata ObjectMetadat
if objMetadata.Object == "" { if objMetadata.Object == "" {
return iodine.New(InvalidArgument{}, nil) return iodine.New(InvalidArgument{}, nil)
} }
objMetadataWriters, err := b.getDiskWriters(objectName, objectMetadataConfig) objMetadataWriters, err := b.getWriters(objectName, objectMetadataConfig)
if err != nil { if err != nil {
return iodine.New(err, nil) return iodine.New(err, nil)
} }
@ -332,7 +332,7 @@ func (b bucket) readObjectMetadata(objectName string) (ObjectMetadata, error) {
if objectName == "" { if objectName == "" {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil) return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
} }
objMetadataReaders, err := b.getDiskReaders(objectName, objectMetadataConfig) objMetadataReaders, err := b.getReaders(objectName, objectMetadataConfig)
if err != nil { if err != nil {
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
@ -378,8 +378,8 @@ func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error)
return k, m, nil return k, m, nil
} }
// writeEncodedData - // writeObjectData -
func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, sumMD5, sum512 hash.Hash) (int, int, error) { func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, sumMD5, sum512 hash.Hash) (int, int, error) {
encoder, err := newEncoder(k, m, "Cauchy") encoder, err := newEncoder(k, m, "Cauchy")
if err != nil { if err != nil {
return 0, 0, iodine.New(err, nil) return 0, 0, iodine.New(err, nil)
@ -387,16 +387,17 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat
chunkCount := 0 chunkCount := 0
totalLength := 0 totalLength := 0
for chunk := range split.Stream(objectData, 10*1024*1024) { for chunk := range split.Stream(objectData, 10*1024*1024) {
if chunk.Err == nil { if chunk.Err != nil {
totalLength = totalLength + len(chunk.Data) return 0, 0, iodine.New(err, nil)
encodedBlocks, _ := encoder.Encode(chunk.Data) }
sumMD5.Write(chunk.Data) totalLength = totalLength + len(chunk.Data)
sum512.Write(chunk.Data) encodedBlocks, _ := encoder.Encode(chunk.Data)
for blockIndex, block := range encodedBlocks { sumMD5.Write(chunk.Data)
_, err := io.Copy(writers[blockIndex], bytes.NewBuffer(block)) sum512.Write(chunk.Data)
if err != nil { for blockIndex, block := range encodedBlocks {
return 0, 0, iodine.New(err, nil) _, err := io.Copy(writers[blockIndex], bytes.NewBuffer(block))
} if err != nil {
return 0, 0, iodine.New(err, nil)
} }
} }
chunkCount = chunkCount + 1 chunkCount = chunkCount + 1
@ -404,9 +405,9 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat
return chunkCount, totalLength, nil return chunkCount, totalLength, nil
} }
// readEncodedData - // readObjectData -
func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) { func (b bucket) readObjectData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) {
readers, err := b.getDiskReaders(objectName, "data") readers, err := b.getReaders(objectName, "data")
if err != nil { if err != nil {
writer.CloseWithError(iodine.New(err, nil)) writer.CloseWithError(iodine.New(err, nil))
return return
@ -490,8 +491,8 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadC
return decodedData, nil return decodedData, nil
} }
// getDiskReaders - // getReaders -
func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser, error) { func (b bucket) getReaders(objectName, objectMeta string) ([]io.ReadCloser, error) {
var readers []io.ReadCloser var readers []io.ReadCloser
nodeSlice := 0 nodeSlice := 0
for _, node := range b.nodes { for _, node := range b.nodes {
@ -514,8 +515,8 @@ func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser,
return readers, nil return readers, nil
} }
// getDiskWriters - // getWriters -
func (b bucket) getDiskWriters(objectName, objectMeta string) ([]io.WriteCloser, error) { func (b bucket) getWriters(objectName, objectMeta string) ([]io.WriteCloser, error) {
var writers []io.WriteCloser var writers []io.WriteCloser
nodeSlice := 0 nodeSlice := 0
for _, node := range b.nodes { for _, node := range b.nodes {

@ -325,10 +325,16 @@ func (d donutDriver) GetObject(w io.Writer, bucketName, objectName string) (int6
return 0, iodine.New(drivers.InternalError{}, nil) return 0, iodine.New(drivers.InternalError{}, nil)
} }
} }
n, err := io.CopyN(w, reader, size) pw := newProxyWriter(w)
n, err := io.CopyN(pw, reader, size)
if err != nil { if err != nil {
return 0, iodine.New(err, nil) return 0, iodine.New(err, nil)
} }
// Save in memory for future reads
d.objects.Set(objectKey, pw.writtenBytes)
// free up
pw.writtenBytes = nil
go debug.FreeOSMemory()
return n, nil return n, nil
} }
written, err := io.Copy(w, bytes.NewBuffer(data)) written, err := io.Copy(w, bytes.NewBuffer(data))
@ -492,31 +498,22 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso
return results, resources, nil return results, resources, nil
} }
type proxyReader struct { type proxyWriter struct {
reader io.Reader writer io.Writer
readBytes []byte writtenBytes []byte
} }
func (r *proxyReader) free(p []byte) { func (r *proxyWriter) Write(p []byte) (n int, err error) {
p = nil n, err = r.writer.Write(p)
go debug.FreeOSMemory()
}
func (r *proxyReader) Read(p []byte) (n int, err error) {
defer r.free(p)
n, err = r.reader.Read(p)
if err == io.EOF || err == io.ErrUnexpectedEOF {
r.readBytes = append(r.readBytes, p[0:n]...)
return
}
if err != nil { if err != nil {
return return
} }
r.readBytes = append(r.readBytes, p[0:n]...) r.writtenBytes = append(r.writtenBytes, p[0:n]...)
return return
} }
func newProxyReader(r io.Reader) *proxyReader { func newProxyWriter(w io.Writer) *proxyWriter {
return &proxyReader{reader: r, readBytes: nil} return &proxyWriter{writer: w, writtenBytes: nil}
} }
// CreateObject creates a new object // CreateObject creates a new object
@ -565,8 +562,7 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
} }
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
} }
newReader := newProxyReader(reader) objMetadata, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, reader, metadata)
objMetadata, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, newReader, metadata)
if err != nil { if err != nil {
switch iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case donut.BadDigest: case donut.BadDigest:
@ -574,10 +570,6 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
} }
return "", iodine.New(err, errParams) return "", iodine.New(err, errParams)
} }
d.objects.Set(objectKey, newReader.readBytes)
// free up
newReader.readBytes = nil
go debug.FreeOSMemory()
newObject := drivers.ObjectMetadata{ newObject := drivers.ObjectMetadata{
Bucket: bucketName, Bucket: bucketName,
Key: objectName, Key: objectName,

@ -193,6 +193,7 @@ func (d donutDriver) createObjectPart(bucketName, objectName, uploadID string, p
d.lock.Unlock() d.lock.Unlock()
// setting up for de-allocation // setting up for de-allocation
readBytes = nil readBytes = nil
go debug.FreeOSMemory()
md5Sum := hex.EncodeToString(md5SumBytes) md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such // Verify if the written object is equal to what is expected, only if it is requested as such
@ -258,6 +259,7 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st
d.lock.Lock() d.lock.Lock()
var size int64 var size int64
fullHasher := md5.New()
var fullObject bytes.Buffer var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ { for i := 1; i <= len(parts); i++ {
recvMD5 := parts[i] recvMD5 := parts[i]
@ -280,7 +282,8 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st
Key: getMultipartKey(objectName, uploadID, i), Key: getMultipartKey(objectName, uploadID, i),
}, nil) }, nil)
} }
_, err = io.Copy(&fullObject, bytes.NewBuffer(object)) mw := io.MultiWriter(&fullObject, fullHasher)
_, err = io.Copy(mw, bytes.NewReader(object))
if err != nil { if err != nil {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
@ -289,9 +292,9 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st
} }
d.lock.Unlock() d.lock.Unlock()
md5sumSlice := md5.Sum(fullObject.Bytes()) md5sumSlice := fullHasher.Sum(nil)
// this is needed for final verification inside CreateObject, do not convert this to hex // this is needed for final verification inside CreateObject, do not convert this to hex
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) md5sum := base64.StdEncoding.EncodeToString(md5sumSlice)
etag, err := d.CreateObject(bucketName, objectName, "", md5sum, size, &fullObject) etag, err := d.CreateObject(bucketName, objectName, "", md5sum, size, &fullObject)
if err != nil { if err != nil {
// No need to call internal cleanup functions here, caller will call AbortMultipartUpload() // No need to call internal cleanup functions here, caller will call AbortMultipartUpload()
@ -299,6 +302,8 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
fullObject.Reset() fullObject.Reset()
go debug.FreeOSMemory()
d.cleanupMultiparts(bucketName, objectName, uploadID) d.cleanupMultiparts(bucketName, objectName, uploadID)
d.cleanupMultipartSession(bucketName, objectName, uploadID) d.cleanupMultipartSession(bucketName, objectName, uploadID)
return etag, nil return etag, nil
@ -421,5 +426,5 @@ func (d donutDriver) expiredPart(a ...interface{}) {
for _, storedBucket := range d.storedBuckets { for _, storedBucket := range d.storedBuckets {
delete(storedBucket.partMetadata, key) delete(storedBucket.partMetadata, key)
} }
debug.FreeOSMemory() go debug.FreeOSMemory()
} }

Loading…
Cancel
Save