object-cache: use golang bytes.Buffer and bytes.NewReader instead of custom implementation. (#2108)

master
Krishna Srinivas 9 years ago committed by Harshavardhana
parent 7bde27032d
commit 01cbacd803
  1. 143
      pkg/objcache/buffer.go
  2. 81
      pkg/objcache/objcache.go
  3. 95
      pkg/objcache/objcache_test.go
  4. 6
      xl-v1-object.go

@ -1,143 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package objcache
import (
"bytes"
"errors"
"io"
"time"
)
// A Buffer is a variable-sized buffer of bytes with Read, Write and Seek methods.
// The zero value for Buffer is an empty buffer ready to use.
type Buffer struct {
buf []byte // contents are the bytes buf[off : len(buf)]
off int // read at &buf[off], write at &buf[len(buf)]
bootstrap [64]byte // memory to hold first slice; helps small buffers (Printf) avoid allocation.
accessTime time.Time // accessTime holds value of the last access time of this buffer.
}
// NewBuffer creates and initializes a new Buffer using buf as its initial
// contents. It is intended to prepare a Buffer to read existing data. It
// can also be used to size the internal buffer for writing. To do that,
// buf should have the desired capacity but a length of zero.
//
// In most cases, new(Buffer) (or just declaring a Buffer variable) is
// sufficient to initialize a Buffer.
func NewBuffer(buf []byte) *Buffer { return &Buffer{buf: buf} }
// Len returns the number of bytes of the unread portion of the buffer;
// b.Len() == len(b.Bytes()).
func (b *Buffer) Len() int { return len(b.buf) - b.off }
// Size returns the original length of the underlying byte slice.
// Size is the number of bytes available for reading via ReadAt.
// The returned value is always the same and is not affected by calls
// to any other method.
func (b *Buffer) Size() int64 { return int64(len(b.buf)) }
// makeSlice allocates a slice of size n. If the allocation fails, it panics
// with ErrTooLarge.
func makeSlice(n int) []byte {
// If the make fails, give a known error.
defer func() {
if recover() != nil {
panic(bytes.ErrTooLarge)
}
}()
return make([]byte, n)
}
// grow grows the buffer to guarantee space for n more bytes.
// It returns the index where bytes should be written.
// If the buffer can't grow it will panic with ErrTooLarge.
func (b *Buffer) grow(n int) int {
m := b.Len()
// If buffer is empty, reset to recover space.
if m == 0 && b.off != 0 {
// Reuse buffer space.
b.buf = b.buf[0:0]
}
if len(b.buf)+n > cap(b.buf) {
var buf []byte
if b.buf == nil && n <= len(b.bootstrap) {
buf = b.bootstrap[0:]
} else if m+n <= cap(b.buf)/2 {
// We can slide things down instead of allocating a new
// slice. We only need m+n <= cap(b.buf) to slide, but
// we instead let capacity get twice as large so we
// don't spend all our time copying.
copy(b.buf[:], b.buf[b.off:])
buf = b.buf[:m]
} else {
// not enough space anywhere
buf = makeSlice(2*cap(b.buf) + n)
copy(buf, b.buf[b.off:])
}
b.buf = buf
b.off = 0
}
b.buf = b.buf[0 : b.off+m+n]
return b.off + m
}
// Write appends the contents of p to the buffer, growing the buffer as
// needed. The return value n is the length of p; err is always nil. If the
// buffer becomes too large, Write will panic with ErrTooLarge.
func (b *Buffer) Write(p []byte) (n int, err error) {
m := b.grow(len(p))
return copy(b.buf[m:], p), nil
}
// Read reads the next len(p) bytes from the buffer or until the buffer
// is drained. The return value n is the number of bytes read. If the
// buffer has no data to return, err is io.EOF (unless len(p) is zero);
// otherwise it is nil.
func (b *Buffer) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
if b.off >= len(b.buf) {
return 0, io.EOF
}
n = copy(p, b.buf[b.off:])
b.off += n
return
}
// Seek implements the io.Seeker interface.
func (b *Buffer) Seek(offset int64, whence int) (int64, error) {
var abs int64
switch whence {
case 0: // Whence 0 sets the offset as new offset.
abs = offset
case 1: // Whence 1 sets the current offset and offset as new offset.
abs = int64(b.off) + offset
case 2: // Whence 2 sets the total size of the buffer and offset
// as new offset, not supported yet. // FIXME.
return 0, errors.New("cache.Buffer.Seek: whence os.SEEK_END is not supported")
default:
return 0, errors.New("cache.Buffer.Seek: invalid whence")
}
if abs < 0 {
return 0, errors.New("cache.Buffer.Seek: negative position")
}
b.off = int(abs)
return abs, nil
}

@ -19,8 +19,8 @@
package objcache package objcache
import ( import (
"bytes"
"errors" "errors"
"fmt"
"io" "io"
"sync" "sync"
"time" "time"
@ -48,8 +48,8 @@ type Cache struct {
// totalEvicted counter to keep track of total expirations // totalEvicted counter to keep track of total expirations
totalEvicted int totalEvicted int
// Represents in memory file system. // map of objectName and its contents
entries map[string]*Buffer entries map[string][]byte
// Expiration in time duration. // Expiration in time duration.
expiry time.Duration expiry time.Duration
@ -63,7 +63,7 @@ func New(maxSize uint64, expiry time.Duration) *Cache {
return &Cache{ return &Cache{
mutex: &sync.RWMutex{}, mutex: &sync.RWMutex{},
maxSize: maxSize, maxSize: maxSize,
entries: make(map[string]*Buffer), entries: make(map[string][]byte),
expiry: expiry, expiry: expiry,
} }
} }
@ -74,33 +74,27 @@ var ErrKeyNotFoundInCache = errors.New("Key not found in cache")
// ErrCacheFull - cache is full. // ErrCacheFull - cache is full.
var ErrCacheFull = errors.New("Not enough space in cache") var ErrCacheFull = errors.New("Not enough space in cache")
// Size returns length of the value of a given key, returns -1 if key doesn't exist // Used for adding entry to the object cache. Implements io.WriteCloser
func (c *Cache) Size(key string) int64 { type cacheBuffer struct {
c.mutex.RLock() *bytes.Buffer // Implements io.Writer
defer c.mutex.RUnlock() onClose func()
_, ok := c.entries[key] }
if ok {
return c.entries[key].Size() // On close, onClose() is called which checks if all object contents
} // have been written so that it can save the buffer to the cache.
return -1 func (c cacheBuffer) Close() error {
c.onClose()
return nil
} }
// Create validates and returns an in memory writer referencing entry. // Create - validates if object size fits with in cache size limit and returns a io.WriteCloser
func (c *Cache) Create(key string, size int64) (writer io.Writer, err error) { // to which object contents can be written and finally Close()'d. During Close() we
// checks if the amount of data written is equal to the size of the object, in which
// case it saves the contents to object cache.
func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
// Recovers any panic generated and return errors appropriately.
defer func() {
if r := recover(); r != nil {
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("objcache: %v", r)
}
}
}() // Do not crash the server.
valueLen := uint64(size) valueLen := uint64(size)
if c.maxSize > 0 { if c.maxSize > 0 {
// Check if the size of the object is not bigger than the capacity of the cache. // Check if the size of the object is not bigger than the capacity of the cache.
@ -112,9 +106,33 @@ func (c *Cache) Create(key string, size int64) (writer io.Writer, err error) {
return nil, ErrCacheFull return nil, ErrCacheFull
} }
} }
c.entries[key] = NewBuffer(make([]byte, 0, int(size)))
c.currentSize += valueLen // Will hold the object contents.
return c.entries[key], nil buf := bytes.NewBuffer(make([]byte, 0, size))
// Account for the memory allocated above.
c.currentSize += uint64(size)
// Function called on close which saves the object contents
// to the object cache.
onClose := func() {
c.mutex.Lock()
defer c.mutex.Unlock()
if buf.Len() != int(size) {
// Full object not available hence do not save buf to object cache.
c.currentSize -= uint64(size)
return
}
// Full object available in buf, save it to cache.
c.entries[key] = buf.Bytes()
return
}
// Object contents that is written - cacheBuffer.Write(data)
// will be accumulated in buf which implements io.Writer.
return cacheBuffer{
buf,
onClose,
}, nil
} }
// Open - open the in-memory file, returns an in memory read seeker. // Open - open the in-memory file, returns an in memory read seeker.
@ -128,7 +146,7 @@ func (c *Cache) Open(key string) (io.ReadSeeker, error) {
if !ok { if !ok {
return nil, ErrKeyNotFoundInCache return nil, ErrKeyNotFoundInCache
} }
return buffer, nil return bytes.NewReader(buffer), nil
} }
// Delete - delete deletes an entry from in-memory fs. // Delete - delete deletes an entry from in-memory fs.
@ -139,8 +157,7 @@ func (c *Cache) Delete(key string) {
// Delete an entry. // Delete an entry.
buffer, ok := c.entries[key] buffer, ok := c.entries[key]
if ok { if ok {
size := buffer.Size() c.deleteEntry(key, int64(len(buffer)))
c.deleteEntry(key, size)
} }
} }

@ -1,95 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package objcache
import (
"io"
"io/ioutil"
"os"
"testing"
)
// Tests different types of seekable operations on an allocated buffer.
func TestBufferSeek(t *testing.T) {
r := NewBuffer([]byte("0123456789"))
tests := []struct {
off int64
seek int
n int
want string
wantpos int64
seekerr string
}{
{seek: os.SEEK_SET, off: 0, n: 20, want: "0123456789"},
{seek: os.SEEK_SET, off: 1, n: 1, want: "1"},
{seek: os.SEEK_CUR, off: 1, wantpos: 3, n: 2, want: "34"},
{seek: os.SEEK_SET, off: -1, seekerr: "cache.Buffer.Seek: negative position"},
{seek: os.SEEK_SET, off: 1 << 33, wantpos: 1 << 33},
{seek: os.SEEK_CUR, off: 1, wantpos: 1<<33 + 1},
{seek: os.SEEK_SET, n: 5, want: "01234"},
{seek: os.SEEK_CUR, n: 5, want: "56789"},
{seek: os.SEEK_END, off: -1, seekerr: "cache.Buffer.Seek: whence os.SEEK_END is not supported"},
}
for i, tt := range tests {
pos, err := r.Seek(tt.off, tt.seek)
if err == nil && tt.seekerr != "" {
t.Errorf("%d. want seek error %q", i, tt.seekerr)
continue
}
if err != nil && err.Error() != tt.seekerr {
t.Errorf("%d. seek error = %q; want %q", i, err.Error(), tt.seekerr)
continue
}
if tt.wantpos != 0 && tt.wantpos != pos {
t.Errorf("%d. pos = %d, want %d", i, pos, tt.wantpos)
}
buf := make([]byte, tt.n)
n, err := r.Read(buf)
if err != nil {
t.Errorf("%d. read = %v", i, err)
continue
}
got := string(buf[:n])
if got != tt.want {
t.Errorf("%d. got %q; want %q", i, got, tt.want)
}
}
}
// Tests read operation after big seek.
func TestReadAfterBigSeek(t *testing.T) {
r := NewBuffer([]byte("0123456789"))
if _, err := r.Seek(1<<31+5, os.SEEK_SET); err != nil {
t.Fatal(err)
}
if n, err := r.Read(make([]byte, 10)); n != 0 || err != io.EOF {
t.Errorf("Read = %d, %v; want 0, EOF", n, err)
}
}
// tests that Len is affected by reads, but Size is not.
func TestBufferLenSize(t *testing.T) {
r := NewBuffer([]byte("abc"))
io.CopyN(ioutil.Discard, r, 1)
if r.Len() != 2 {
t.Errorf("Len = %d; want 2", r.Len())
}
if r.Size() != 3 {
t.Errorf("Size = %d; want 3", r.Size())
}
}

@ -111,7 +111,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
} }
return nil return nil
} // Cache miss. } // Cache miss.
// For unknown error, return and error out. // For unknown error, return and error out.
if err != objcache.ErrKeyNotFoundInCache { if err != objcache.ErrKeyNotFoundInCache {
return err return err
@ -120,12 +119,13 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
// Cache is only set if whole object is being read. // Cache is only set if whole object is being read.
if startOffset == 0 && length == xlMeta.Stat.Size { if startOffset == 0 && length == xlMeta.Stat.Size {
// Proceed to set the cache. // Proceed to set the cache.
var newBuffer io.Writer var newBuffer io.WriteCloser
// Create a new entry in memory of length. // Create a new entry in memory of length.
newBuffer, err = xl.objCache.Create(path.Join(bucket, object), length) newBuffer, err = xl.objCache.Create(path.Join(bucket, object), length)
if err == nil { if err == nil {
// Create a multi writer to write to both memory and client response. // Create a multi writer to write to both memory and client response.
mw = io.MultiWriter(newBuffer, writer) mw = io.MultiWriter(newBuffer, writer)
defer newBuffer.Close()
} }
if err != nil && err != objcache.ErrCacheFull { if err != nil && err != objcache.ErrCacheFull {
// Perhaps cache is full, returns here. // Perhaps cache is full, returns here.
@ -153,8 +153,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
// Start reading the part name. // Start reading the part name.
n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize)
if err != nil { if err != nil {
// Purge the partial object upon any error.
xl.objCache.Delete(path.Join(bucket, object))
return err return err
} }

Loading…
Cancel
Save