diff --git a/pkg/objcache/buffer.go b/pkg/objcache/buffer.go deleted file mode 100644 index c99e49aa0..000000000 --- a/pkg/objcache/buffer.go +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package objcache - -import ( - "bytes" - "errors" - "io" - "time" -) - -// A Buffer is a variable-sized buffer of bytes with Read, Write and Seek methods. -// The zero value for Buffer is an empty buffer ready to use. -type Buffer struct { - buf []byte // contents are the bytes buf[off : len(buf)] - off int // read at &buf[off], write at &buf[len(buf)] - bootstrap [64]byte // memory to hold first slice; helps small buffers (Printf) avoid allocation. - accessTime time.Time // accessTime holds value of the last access time of this buffer. -} - -// NewBuffer creates and initializes a new Buffer using buf as its initial -// contents. It is intended to prepare a Buffer to read existing data. It -// can also be used to size the internal buffer for writing. To do that, -// buf should have the desired capacity but a length of zero. -// -// In most cases, new(Buffer) (or just declaring a Buffer variable) is -// sufficient to initialize a Buffer. -func NewBuffer(buf []byte) *Buffer { return &Buffer{buf: buf} } - -// Len returns the number of bytes of the unread portion of the buffer; -// b.Len() == len(b.Bytes()). -func (b *Buffer) Len() int { return len(b.buf) - b.off } - -// Size returns the original length of the underlying byte slice. -// Size is the number of bytes available for reading via ReadAt. -// The returned value is always the same and is not affected by calls -// to any other method. -func (b *Buffer) Size() int64 { return int64(len(b.buf)) } - -// makeSlice allocates a slice of size n. If the allocation fails, it panics -// with ErrTooLarge. -func makeSlice(n int) []byte { - // If the make fails, give a known error. - defer func() { - if recover() != nil { - panic(bytes.ErrTooLarge) - } - }() - return make([]byte, n) -} - -// grow grows the buffer to guarantee space for n more bytes. -// It returns the index where bytes should be written. -// If the buffer can't grow it will panic with ErrTooLarge. -func (b *Buffer) grow(n int) int { - m := b.Len() - // If buffer is empty, reset to recover space. - if m == 0 && b.off != 0 { - // Reuse buffer space. - b.buf = b.buf[0:0] - } - if len(b.buf)+n > cap(b.buf) { - var buf []byte - if b.buf == nil && n <= len(b.bootstrap) { - buf = b.bootstrap[0:] - } else if m+n <= cap(b.buf)/2 { - // We can slide things down instead of allocating a new - // slice. We only need m+n <= cap(b.buf) to slide, but - // we instead let capacity get twice as large so we - // don't spend all our time copying. - copy(b.buf[:], b.buf[b.off:]) - buf = b.buf[:m] - } else { - // not enough space anywhere - buf = makeSlice(2*cap(b.buf) + n) - copy(buf, b.buf[b.off:]) - } - b.buf = buf - b.off = 0 - } - b.buf = b.buf[0 : b.off+m+n] - return b.off + m -} - -// Write appends the contents of p to the buffer, growing the buffer as -// needed. The return value n is the length of p; err is always nil. If the -// buffer becomes too large, Write will panic with ErrTooLarge. -func (b *Buffer) Write(p []byte) (n int, err error) { - m := b.grow(len(p)) - return copy(b.buf[m:], p), nil -} - -// Read reads the next len(p) bytes from the buffer or until the buffer -// is drained. The return value n is the number of bytes read. If the -// buffer has no data to return, err is io.EOF (unless len(p) is zero); -// otherwise it is nil. -func (b *Buffer) Read(p []byte) (n int, err error) { - if len(p) == 0 { - return 0, nil - } - if b.off >= len(b.buf) { - return 0, io.EOF - } - n = copy(p, b.buf[b.off:]) - b.off += n - return -} - -// Seek implements the io.Seeker interface. -func (b *Buffer) Seek(offset int64, whence int) (int64, error) { - var abs int64 - switch whence { - case 0: // Whence 0 sets the offset as new offset. - abs = offset - case 1: // Whence 1 sets the current offset and offset as new offset. - abs = int64(b.off) + offset - case 2: // Whence 2 sets the total size of the buffer and offset - // as new offset, not supported yet. // FIXME. - return 0, errors.New("cache.Buffer.Seek: whence os.SEEK_END is not supported") - default: - return 0, errors.New("cache.Buffer.Seek: invalid whence") - } - if abs < 0 { - return 0, errors.New("cache.Buffer.Seek: negative position") - } - b.off = int(abs) - return abs, nil -} diff --git a/pkg/objcache/objcache.go b/pkg/objcache/objcache.go index 9e7d6ce4b..013fdebde 100644 --- a/pkg/objcache/objcache.go +++ b/pkg/objcache/objcache.go @@ -19,8 +19,8 @@ package objcache import ( + "bytes" "errors" - "fmt" "io" "sync" "time" @@ -48,8 +48,8 @@ type Cache struct { // totalEvicted counter to keep track of total expirations totalEvicted int - // Represents in memory file system. - entries map[string]*Buffer + // map of objectName and its contents + entries map[string][]byte // Expiration in time duration. expiry time.Duration @@ -63,7 +63,7 @@ func New(maxSize uint64, expiry time.Duration) *Cache { return &Cache{ mutex: &sync.RWMutex{}, maxSize: maxSize, - entries: make(map[string]*Buffer), + entries: make(map[string][]byte), expiry: expiry, } } @@ -74,33 +74,27 @@ var ErrKeyNotFoundInCache = errors.New("Key not found in cache") // ErrCacheFull - cache is full. var ErrCacheFull = errors.New("Not enough space in cache") -// Size returns length of the value of a given key, returns -1 if key doesn't exist -func (c *Cache) Size(key string) int64 { - c.mutex.RLock() - defer c.mutex.RUnlock() - _, ok := c.entries[key] - if ok { - return c.entries[key].Size() - } - return -1 +// Used for adding entry to the object cache. Implements io.WriteCloser +type cacheBuffer struct { + *bytes.Buffer // Implements io.Writer + onClose func() +} + +// On close, onClose() is called which checks if all object contents +// have been written so that it can save the buffer to the cache. +func (c cacheBuffer) Close() error { + c.onClose() + return nil } -// Create validates and returns an in memory writer referencing entry. -func (c *Cache) Create(key string, size int64) (writer io.Writer, err error) { +// Create - validates if object size fits with in cache size limit and returns a io.WriteCloser +// to which object contents can be written and finally Close()'d. During Close() we +// checks if the amount of data written is equal to the size of the object, in which +// case it saves the contents to object cache. +func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) { c.mutex.Lock() defer c.mutex.Unlock() - // Recovers any panic generated and return errors appropriately. - defer func() { - if r := recover(); r != nil { - var ok bool - err, ok = r.(error) - if !ok { - err = fmt.Errorf("objcache: %v", r) - } - } - }() // Do not crash the server. - valueLen := uint64(size) if c.maxSize > 0 { // Check if the size of the object is not bigger than the capacity of the cache. @@ -112,9 +106,33 @@ func (c *Cache) Create(key string, size int64) (writer io.Writer, err error) { return nil, ErrCacheFull } } - c.entries[key] = NewBuffer(make([]byte, 0, int(size))) - c.currentSize += valueLen - return c.entries[key], nil + + // Will hold the object contents. + buf := bytes.NewBuffer(make([]byte, 0, size)) + // Account for the memory allocated above. + c.currentSize += uint64(size) + + // Function called on close which saves the object contents + // to the object cache. + onClose := func() { + c.mutex.Lock() + defer c.mutex.Unlock() + if buf.Len() != int(size) { + // Full object not available hence do not save buf to object cache. + c.currentSize -= uint64(size) + return + } + // Full object available in buf, save it to cache. + c.entries[key] = buf.Bytes() + return + } + + // Object contents that is written - cacheBuffer.Write(data) + // will be accumulated in buf which implements io.Writer. + return cacheBuffer{ + buf, + onClose, + }, nil } // Open - open the in-memory file, returns an in memory read seeker. @@ -128,7 +146,7 @@ func (c *Cache) Open(key string) (io.ReadSeeker, error) { if !ok { return nil, ErrKeyNotFoundInCache } - return buffer, nil + return bytes.NewReader(buffer), nil } // Delete - delete deletes an entry from in-memory fs. @@ -139,8 +157,7 @@ func (c *Cache) Delete(key string) { // Delete an entry. buffer, ok := c.entries[key] if ok { - size := buffer.Size() - c.deleteEntry(key, size) + c.deleteEntry(key, int64(len(buffer))) } } diff --git a/pkg/objcache/objcache_test.go b/pkg/objcache/objcache_test.go deleted file mode 100644 index 70f2beadd..000000000 --- a/pkg/objcache/objcache_test.go +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package objcache - -import ( - "io" - "io/ioutil" - "os" - "testing" -) - -// Tests different types of seekable operations on an allocated buffer. -func TestBufferSeek(t *testing.T) { - r := NewBuffer([]byte("0123456789")) - tests := []struct { - off int64 - seek int - n int - want string - wantpos int64 - seekerr string - }{ - {seek: os.SEEK_SET, off: 0, n: 20, want: "0123456789"}, - {seek: os.SEEK_SET, off: 1, n: 1, want: "1"}, - {seek: os.SEEK_CUR, off: 1, wantpos: 3, n: 2, want: "34"}, - {seek: os.SEEK_SET, off: -1, seekerr: "cache.Buffer.Seek: negative position"}, - {seek: os.SEEK_SET, off: 1 << 33, wantpos: 1 << 33}, - {seek: os.SEEK_CUR, off: 1, wantpos: 1<<33 + 1}, - {seek: os.SEEK_SET, n: 5, want: "01234"}, - {seek: os.SEEK_CUR, n: 5, want: "56789"}, - {seek: os.SEEK_END, off: -1, seekerr: "cache.Buffer.Seek: whence os.SEEK_END is not supported"}, - } - - for i, tt := range tests { - pos, err := r.Seek(tt.off, tt.seek) - if err == nil && tt.seekerr != "" { - t.Errorf("%d. want seek error %q", i, tt.seekerr) - continue - } - if err != nil && err.Error() != tt.seekerr { - t.Errorf("%d. seek error = %q; want %q", i, err.Error(), tt.seekerr) - continue - } - if tt.wantpos != 0 && tt.wantpos != pos { - t.Errorf("%d. pos = %d, want %d", i, pos, tt.wantpos) - } - buf := make([]byte, tt.n) - n, err := r.Read(buf) - if err != nil { - t.Errorf("%d. read = %v", i, err) - continue - } - got := string(buf[:n]) - if got != tt.want { - t.Errorf("%d. got %q; want %q", i, got, tt.want) - } - } -} - -// Tests read operation after big seek. -func TestReadAfterBigSeek(t *testing.T) { - r := NewBuffer([]byte("0123456789")) - if _, err := r.Seek(1<<31+5, os.SEEK_SET); err != nil { - t.Fatal(err) - } - if n, err := r.Read(make([]byte, 10)); n != 0 || err != io.EOF { - t.Errorf("Read = %d, %v; want 0, EOF", n, err) - } -} - -// tests that Len is affected by reads, but Size is not. -func TestBufferLenSize(t *testing.T) { - r := NewBuffer([]byte("abc")) - io.CopyN(ioutil.Discard, r, 1) - if r.Len() != 2 { - t.Errorf("Len = %d; want 2", r.Len()) - } - if r.Size() != 3 { - t.Errorf("Size = %d; want 3", r.Size()) - } -} diff --git a/xl-v1-object.go b/xl-v1-object.go index 8654c3528..24e158d62 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -111,7 +111,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i } return nil } // Cache miss. - // For unknown error, return and error out. if err != objcache.ErrKeyNotFoundInCache { return err @@ -120,12 +119,13 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Cache is only set if whole object is being read. if startOffset == 0 && length == xlMeta.Stat.Size { // Proceed to set the cache. - var newBuffer io.Writer + var newBuffer io.WriteCloser // Create a new entry in memory of length. newBuffer, err = xl.objCache.Create(path.Join(bucket, object), length) if err == nil { // Create a multi writer to write to both memory and client response. mw = io.MultiWriter(newBuffer, writer) + defer newBuffer.Close() } if err != nil && err != objcache.ErrCacheFull { // Perhaps cache is full, returns here. @@ -153,8 +153,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Start reading the part name. n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) if err != nil { - // Purge the partial object upon any error. - xl.objCache.Delete(path.Join(bucket, object)) return err }