diff --git a/pkg/storage/donut/cache.go b/pkg/storage/donut/cache.go index 7e307c7bb..91e3c7adf 100644 --- a/pkg/storage/donut/cache.go +++ b/pkg/storage/donut/cache.go @@ -106,14 +106,16 @@ func NewCache(maxSize uint64, expiration time.Duration, donutName string, nodeDi // GetObject - GET object from cache buffer func (cache Cache) GetObject(w io.Writer, bucket string, object string) (int64, error) { cache.lock.RLock() - defer cache.lock.RUnlock() if !IsValidBucket(bucket) { + cache.lock.RUnlock() return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(object) { + cache.lock.RUnlock() return 0, iodine.New(ObjectNameInvalid{Object: object}, nil) } if _, ok := cache.storedBuckets[bucket]; ok == false { + cache.lock.RUnlock() return 0, iodine.New(BucketNotFound{Bucket: bucket}, nil) } objectKey := bucket + "/" + object @@ -122,20 +124,38 @@ func (cache Cache) GetObject(w io.Writer, bucket string, object string) (int64, if cache.donut != nil { reader, size, err := cache.donut.GetObject(bucket, object) if err != nil { + cache.lock.RUnlock() return 0, iodine.New(err, nil) } - written, err := io.CopyN(w, reader, size) + // new proxy writer to capture data read from disk + pw := newProxyWriter(w) + written, err := io.CopyN(pw, reader, size) if err != nil { + cache.lock.RUnlock() return 0, iodine.New(err, nil) } + cache.lock.RUnlock() + /// cache object read from disk + { + cache.lock.Lock() + ok := cache.objects.Set(objectKey, pw.writtenBytes) + cache.lock.Unlock() + pw.writtenBytes = nil + go debug.FreeOSMemory() + if !ok { + return 0, iodine.New(InternalError{}, nil) + } + } return written, nil } + cache.lock.RUnlock() return 0, iodine.New(ObjectNotFound{Object: object}, nil) } - written, err := io.Copy(w, bytes.NewBuffer(data)) + written, err := io.CopyN(w, bytes.NewBuffer(data), int64(cache.objects.Len(objectKey))) if err != nil { return 0, iodine.New(err, nil) } + cache.lock.RUnlock() return written, nil } @@ -148,14 +168,16 @@ func (cache Cache) GetPartialObject(w io.Writer, bucket, object string, start, l "length": strconv.FormatInt(length, 10), } cache.lock.RLock() - defer cache.lock.RUnlock() if !IsValidBucket(bucket) { + cache.lock.RUnlock() return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, errParams) } if !IsValidObjectName(object) { + cache.lock.RUnlock() return 0, iodine.New(ObjectNameInvalid{Object: object}, errParams) } if start < 0 { + cache.lock.RUnlock() return 0, iodine.New(InvalidRange{ Start: start, Length: length, @@ -167,23 +189,40 @@ func (cache Cache) GetPartialObject(w io.Writer, bucket, object string, start, l if cache.donut != nil { reader, _, err := cache.donut.GetObject(bucket, object) if err != nil { + cache.lock.RUnlock() return 0, iodine.New(err, nil) } if _, err := io.CopyN(ioutil.Discard, reader, start); err != nil { + cache.lock.RUnlock() return 0, iodine.New(err, nil) } + pw := newProxyWriter(w) written, err := io.CopyN(w, reader, length) if err != nil { + cache.lock.RUnlock() return 0, iodine.New(err, nil) } + cache.lock.RUnlock() + { + cache.lock.Lock() + ok := cache.objects.Set(objectKey, pw.writtenBytes) + cache.lock.Unlock() + pw.writtenBytes = nil + go debug.FreeOSMemory() + if !ok { + return 0, iodine.New(InternalError{}, nil) + } + } return written, nil } + cache.lock.RUnlock() return 0, iodine.New(ObjectNotFound{Object: object}, nil) } written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) if err != nil { return 0, iodine.New(err, nil) } + cache.lock.RUnlock() return written, nil } @@ -317,13 +356,24 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string, expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } + if cache.donut != nil { + objMetadata, err := cache.donut.PutObject(bucket, key, expectedMD5Sum, data, map[string]string{"contentType": contentType}) + if err != nil { + return ObjectMetadata{}, iodine.New(err, nil) + } + cache.lock.Lock() + storedBucket.objectMetadata[objectKey] = objMetadata + cache.storedBuckets[bucket] = storedBucket + cache.lock.Unlock() + return objMetadata, nil + } // calculate md5 hash := md5.New() - var readBytes []byte var err error - var length int + var totalLength int for err == nil { + var length int byteBuffer := make([]byte, 1024*1024) length, err = data.Read(byteBuffer) // While hash.Write() wouldn't mind a Nil byteBuffer @@ -332,24 +382,20 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string, break } hash.Write(byteBuffer[0:length]) - readBytes = append(readBytes, byteBuffer[0:length]...) + cache.lock.Lock() + ok := cache.objects.Append(objectKey, byteBuffer[0:length]) + cache.lock.Unlock() + if !ok { + return ObjectMetadata{}, iodine.New(InternalError{}, nil) + } + totalLength += length + go debug.FreeOSMemory() } if err != io.EOF { return ObjectMetadata{}, iodine.New(err, nil) } - md5SumBytes := hash.Sum(nil) - totalLength := len(readBytes) - - cache.lock.Lock() - ok := cache.objects.Set(objectKey, readBytes) - // setting up for de-allocation - readBytes = nil - go debug.FreeOSMemory() - cache.lock.Unlock() - if !ok { - return ObjectMetadata{}, iodine.New(InternalError{}, nil) - } + md5SumBytes := hash.Sum(nil) md5Sum := hex.EncodeToString(md5SumBytes) // Verify if the written object is equal to what is expected, only if it is requested as such if strings.TrimSpace(expectedMD5Sum) != "" { @@ -576,14 +622,8 @@ func (cache Cache) expiredObject(a ...interface{}) { cacheStats.Bytes, cacheStats.Items, cacheStats.Expired) key := a[0].(string) // loop through all buckets - for bucket, storedBucket := range cache.storedBuckets { + for _, storedBucket := range cache.storedBuckets { delete(storedBucket.objectMetadata, key) - // remove bucket if no objects found anymore - if len(storedBucket.objectMetadata) == 0 { - if time.Since(cache.storedBuckets[bucket].bucketMetadata.Created) > cache.expiration { - delete(cache.storedBuckets, bucket) - } - } } debug.FreeOSMemory() } diff --git a/pkg/storage/donut/trove/trove.go b/pkg/storage/donut/trove/trove.go index dce8566bc..7bc8e5ccd 100644 --- a/pkg/storage/donut/trove/trove.go +++ b/pkg/storage/donut/trove/trove.go @@ -115,6 +115,17 @@ func (r *Cache) Get(key string) ([]byte, bool) { return value, true } +// Len returns length of the value of a given key, returns zero if key doesn't exist +func (r *Cache) Len(key string) int { + r.Lock() + defer r.Unlock() + _, ok := r.items[key] + if !ok { + return 0 + } + return len(r.items[key]) +} + // Append will append new data to an existing key, // if key doesn't exist it behaves like Set() func (r *Cache) Append(key string, value []byte) bool {