From d2557bb53846082b05f2b8182d6caa4ec762e200 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 3 Jul 2016 16:58:21 -0700 Subject: [PATCH] XL: GetObject caching implemented for XL. (#2017) The object cache implementation is XL cache, which defaults to 8GB worth of read cache. Currently GetObject() transparently writes to this cache upon first client read and then subsequently serves reads from the same cache. Currently expiration is not implemented. --- globals.go | 2 + object-api_test.go | 6 +- object-handlers.go | 12 +-- pkg/objcache/buffer.go | 143 ++++++++++++++++++++++++++++++++++ pkg/objcache/objcache.go | 143 ++++++++++++++++++++++++++++++++++ pkg/objcache/objcache_test.go | 95 ++++++++++++++++++++++ server-main.go | 8 ++ strconv-bytes.go | 117 ++++++++++++++++++++++++++++ strconv-bytes_test.go | 112 ++++++++++++++++++++++++++ xl-v1-errors.go | 37 +++++++++ xl-v1-object.go | 42 +++++++++- xl-v1.go | 48 +++++------- 12 files changed, 729 insertions(+), 36 deletions(-) create mode 100644 pkg/objcache/buffer.go create mode 100644 pkg/objcache/objcache.go create mode 100644 pkg/objcache/objcache_test.go create mode 100644 strconv-bytes.go create mode 100644 strconv-bytes_test.go create mode 100644 xl-v1-errors.go diff --git a/globals.go b/globals.go index 431568159..21182abf0 100644 --- a/globals.go +++ b/globals.go @@ -43,6 +43,8 @@ var ( // Maximum connections handled per // server, defaults to 0 (unlimited). globalMaxConn = 0 + // Maximum cache size. + globalMaxCacheSize = uint64(maxCacheSize) // Add new variable global values here. ) diff --git a/object-api_test.go b/object-api_test.go index e4293810a..cdb8cd142 100644 --- a/object-api_test.go +++ b/object-api_test.go @@ -16,9 +16,7 @@ package main -import ( - . "gopkg.in/check.v1" -) +import . "gopkg.in/check.v1" type ObjectLayerAPISuite struct{} @@ -28,6 +26,7 @@ var _ = Suite(&ObjectLayerAPISuite{}) func (s *ObjectLayerAPISuite) TestFSAPISuite(c *C) { // Initialize name space lock. initNSLock() + // function which creates a temp FS backend and executes the object layer suite test. execObjectLayerSuiteTestFS := func(objSuiteTest objSuiteTestType) { // create temp object layer backend. @@ -51,6 +50,7 @@ type objSuiteTestType func(c *C, obj ObjectLayer) func (s *ObjectLayerAPISuite) TestXLAPISuite(c *C) { // Initialize name space lock. initNSLock() + // function which creates a temp XL backend and executes the object layer suite test. execObjectLayerSuiteTestXL := func(objSuiteTest objSuiteTestType) { // create temp object layer backend. diff --git a/object-handlers.go b/object-handlers.go index 4560f2c92..4de55d4bf 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -115,6 +115,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req return } + // Caculate the http Range. var hrange *httpRange hrange, err = getRequestedRange(r.Header.Get("Range"), objInfo.Size) if err != nil { @@ -144,22 +145,23 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req if length == 0 { length = objInfo.Size - startOffset } + + // Reads the object at startOffset and writes to mw. if err := api.ObjectAPI.GetObject(bucket, object, startOffset, length, w); err != nil { - errorIf(err, "Writing to client failed.") + errorIf(err, "Unable to write to client.") // Do not send error response here, client would have already died. return } + // Success. } -var unixEpochTime = time.Unix(0, 0) - // checkLastModified implements If-Modified-Since and // If-Unmodified-Since checks. // // modtime is the modification time of the resource to be served, or // IsZero(). return value is whether this request is now complete. func checkLastModified(w http.ResponseWriter, r *http.Request, modtime time.Time) bool { - if modtime.IsZero() || modtime.Equal(unixEpochTime) { + if modtime.IsZero() || modtime.Equal(time.Unix(0, 0)) { // If the object doesn't have a modtime (IsZero), or the modtime // is obviously garbage (Unix time == 0), then ignore modtimes // and don't process the If-Modified-Since header. @@ -452,7 +454,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // modtime is the modification time of the resource to be served, or // IsZero(). return value is whether this request is now complete. func checkCopySourceLastModified(w http.ResponseWriter, r *http.Request, modtime time.Time) bool { - if modtime.IsZero() || modtime.Equal(unixEpochTime) { + if modtime.IsZero() || modtime.Equal(time.Unix(0, 0)) { // If the object doesn't have a modtime (IsZero), or the modtime // is obviously garbage (Unix time == 0), then ignore modtimes // and don't process the If-Modified-Since header. diff --git a/pkg/objcache/buffer.go b/pkg/objcache/buffer.go new file mode 100644 index 000000000..c99e49aa0 --- /dev/null +++ b/pkg/objcache/buffer.go @@ -0,0 +1,143 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package objcache + +import ( + "bytes" + "errors" + "io" + "time" +) + +// A Buffer is a variable-sized buffer of bytes with Read, Write and Seek methods. +// The zero value for Buffer is an empty buffer ready to use. +type Buffer struct { + buf []byte // contents are the bytes buf[off : len(buf)] + off int // read at &buf[off], write at &buf[len(buf)] + bootstrap [64]byte // memory to hold first slice; helps small buffers (Printf) avoid allocation. + accessTime time.Time // accessTime holds value of the last access time of this buffer. +} + +// NewBuffer creates and initializes a new Buffer using buf as its initial +// contents. It is intended to prepare a Buffer to read existing data. It +// can also be used to size the internal buffer for writing. To do that, +// buf should have the desired capacity but a length of zero. +// +// In most cases, new(Buffer) (or just declaring a Buffer variable) is +// sufficient to initialize a Buffer. +func NewBuffer(buf []byte) *Buffer { return &Buffer{buf: buf} } + +// Len returns the number of bytes of the unread portion of the buffer; +// b.Len() == len(b.Bytes()). +func (b *Buffer) Len() int { return len(b.buf) - b.off } + +// Size returns the original length of the underlying byte slice. +// Size is the number of bytes available for reading via ReadAt. +// The returned value is always the same and is not affected by calls +// to any other method. +func (b *Buffer) Size() int64 { return int64(len(b.buf)) } + +// makeSlice allocates a slice of size n. If the allocation fails, it panics +// with ErrTooLarge. +func makeSlice(n int) []byte { + // If the make fails, give a known error. + defer func() { + if recover() != nil { + panic(bytes.ErrTooLarge) + } + }() + return make([]byte, n) +} + +// grow grows the buffer to guarantee space for n more bytes. +// It returns the index where bytes should be written. +// If the buffer can't grow it will panic with ErrTooLarge. +func (b *Buffer) grow(n int) int { + m := b.Len() + // If buffer is empty, reset to recover space. + if m == 0 && b.off != 0 { + // Reuse buffer space. + b.buf = b.buf[0:0] + } + if len(b.buf)+n > cap(b.buf) { + var buf []byte + if b.buf == nil && n <= len(b.bootstrap) { + buf = b.bootstrap[0:] + } else if m+n <= cap(b.buf)/2 { + // We can slide things down instead of allocating a new + // slice. We only need m+n <= cap(b.buf) to slide, but + // we instead let capacity get twice as large so we + // don't spend all our time copying. + copy(b.buf[:], b.buf[b.off:]) + buf = b.buf[:m] + } else { + // not enough space anywhere + buf = makeSlice(2*cap(b.buf) + n) + copy(buf, b.buf[b.off:]) + } + b.buf = buf + b.off = 0 + } + b.buf = b.buf[0 : b.off+m+n] + return b.off + m +} + +// Write appends the contents of p to the buffer, growing the buffer as +// needed. The return value n is the length of p; err is always nil. If the +// buffer becomes too large, Write will panic with ErrTooLarge. +func (b *Buffer) Write(p []byte) (n int, err error) { + m := b.grow(len(p)) + return copy(b.buf[m:], p), nil +} + +// Read reads the next len(p) bytes from the buffer or until the buffer +// is drained. The return value n is the number of bytes read. If the +// buffer has no data to return, err is io.EOF (unless len(p) is zero); +// otherwise it is nil. +func (b *Buffer) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + if b.off >= len(b.buf) { + return 0, io.EOF + } + n = copy(p, b.buf[b.off:]) + b.off += n + return +} + +// Seek implements the io.Seeker interface. +func (b *Buffer) Seek(offset int64, whence int) (int64, error) { + var abs int64 + switch whence { + case 0: // Whence 0 sets the offset as new offset. + abs = offset + case 1: // Whence 1 sets the current offset and offset as new offset. + abs = int64(b.off) + offset + case 2: // Whence 2 sets the total size of the buffer and offset + // as new offset, not supported yet. // FIXME. + return 0, errors.New("cache.Buffer.Seek: whence os.SEEK_END is not supported") + default: + return 0, errors.New("cache.Buffer.Seek: invalid whence") + } + if abs < 0 { + return 0, errors.New("cache.Buffer.Seek: negative position") + } + b.off = int(abs) + return abs, nil +} diff --git a/pkg/objcache/objcache.go b/pkg/objcache/objcache.go new file mode 100644 index 000000000..518c92ae1 --- /dev/null +++ b/pkg/objcache/objcache.go @@ -0,0 +1,143 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package objcache implements in memory caching methods. +package objcache + +import ( + "errors" + "io" + "sync" + "time" +) + +// NoExpiration represents caches to be permanent and can only be deleted. +var NoExpiration = time.Duration(0) + +// Cache holds the required variables to compose an in memory cache system +// which also provides expiring key mechanism and also maxSize. +type Cache struct { + // Mutex is used for handling the concurrent + // read/write requests for cache + mutex *sync.RWMutex + + // maxSize is a total size for overall cache + maxSize uint64 + + // currentSize is a current size in memory + currentSize uint64 + + // OnEviction - callback function for eviction + OnEviction func(a ...interface{}) + + // totalEvicted counter to keep track of total expirations + totalEvicted int + + // Represents in memory file system. + entries map[string]*Buffer + + // Expiration in time duration. + expiry time.Duration +} + +// New creates an inmemory cache +// +// maxSize is used for expiring objects before we run out of memory +// expiration is used for expiration of a key from cache +func New(maxSize uint64, expiry time.Duration) *Cache { + return &Cache{ + mutex: &sync.RWMutex{}, + maxSize: maxSize, + entries: make(map[string]*Buffer), + expiry: expiry, + } +} + +// ErrKeyNotFoundInCache - key not found in cache. +var ErrKeyNotFoundInCache = errors.New("Key not found in cache") + +// ErrCacheFull - cache is full. +var ErrCacheFull = errors.New("Not enough space in cache") + +// Size returns length of the value of a given key, returns -1 if key doesn't exist +func (c *Cache) Size(key string) int64 { + c.mutex.RLock() + defer c.mutex.RUnlock() + _, ok := c.entries[key] + if ok { + return c.entries[key].Size() + } + return -1 +} + +// Create validates and returns an in memory writer referencing entry. +func (c *Cache) Create(key string, size int64) (io.Writer, error) { + c.mutex.Lock() + defer c.mutex.Unlock() + + valueLen := uint64(size) + if c.maxSize > 0 { + // Check if the size of the object is not bigger than the capacity of the cache. + if valueLen > c.maxSize { + return nil, ErrCacheFull + } + // TODO - auto expire random key. + if c.currentSize+valueLen > c.maxSize { + return nil, ErrCacheFull + } + } + c.entries[key] = NewBuffer(make([]byte, 0, int(size))) + c.currentSize += valueLen + return c.entries[key], nil +} + +// Open - open the in-memory file, returns an memory reader. +// returns error ErrNotFoundInCache if fsPath does not exist. +func (c *Cache) Open(key string) (io.ReadSeeker, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + + // Entry exists, return the readable buffer. + buffer, ok := c.entries[key] + if !ok { + return nil, ErrKeyNotFoundInCache + } + return buffer, nil +} + +// Delete - delete deletes an entry from in-memory fs. +func (c *Cache) Delete(key string) { + c.mutex.Lock() + defer c.mutex.Unlock() + + // Delete an entry. + buffer, ok := c.entries[key] + if ok { + size := buffer.Size() + c.deleteEntry(key, size) + } +} + +// Deletes the entry that was found. +func (c *Cache) deleteEntry(key string, size int64) { + delete(c.entries, key) + c.currentSize -= uint64(size) + c.totalEvicted++ + if c.OnEviction != nil { + c.OnEviction(key) + } +} diff --git a/pkg/objcache/objcache_test.go b/pkg/objcache/objcache_test.go new file mode 100644 index 000000000..70f2beadd --- /dev/null +++ b/pkg/objcache/objcache_test.go @@ -0,0 +1,95 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package objcache + +import ( + "io" + "io/ioutil" + "os" + "testing" +) + +// Tests different types of seekable operations on an allocated buffer. +func TestBufferSeek(t *testing.T) { + r := NewBuffer([]byte("0123456789")) + tests := []struct { + off int64 + seek int + n int + want string + wantpos int64 + seekerr string + }{ + {seek: os.SEEK_SET, off: 0, n: 20, want: "0123456789"}, + {seek: os.SEEK_SET, off: 1, n: 1, want: "1"}, + {seek: os.SEEK_CUR, off: 1, wantpos: 3, n: 2, want: "34"}, + {seek: os.SEEK_SET, off: -1, seekerr: "cache.Buffer.Seek: negative position"}, + {seek: os.SEEK_SET, off: 1 << 33, wantpos: 1 << 33}, + {seek: os.SEEK_CUR, off: 1, wantpos: 1<<33 + 1}, + {seek: os.SEEK_SET, n: 5, want: "01234"}, + {seek: os.SEEK_CUR, n: 5, want: "56789"}, + {seek: os.SEEK_END, off: -1, seekerr: "cache.Buffer.Seek: whence os.SEEK_END is not supported"}, + } + + for i, tt := range tests { + pos, err := r.Seek(tt.off, tt.seek) + if err == nil && tt.seekerr != "" { + t.Errorf("%d. want seek error %q", i, tt.seekerr) + continue + } + if err != nil && err.Error() != tt.seekerr { + t.Errorf("%d. seek error = %q; want %q", i, err.Error(), tt.seekerr) + continue + } + if tt.wantpos != 0 && tt.wantpos != pos { + t.Errorf("%d. pos = %d, want %d", i, pos, tt.wantpos) + } + buf := make([]byte, tt.n) + n, err := r.Read(buf) + if err != nil { + t.Errorf("%d. read = %v", i, err) + continue + } + got := string(buf[:n]) + if got != tt.want { + t.Errorf("%d. got %q; want %q", i, got, tt.want) + } + } +} + +// Tests read operation after big seek. +func TestReadAfterBigSeek(t *testing.T) { + r := NewBuffer([]byte("0123456789")) + if _, err := r.Seek(1<<31+5, os.SEEK_SET); err != nil { + t.Fatal(err) + } + if n, err := r.Read(make([]byte, 10)); n != 0 || err != io.EOF { + t.Errorf("Read = %d, %v; want 0, EOF", n, err) + } +} + +// tests that Len is affected by reads, but Size is not. +func TestBufferLenSize(t *testing.T) { + r := NewBuffer([]byte("abc")) + io.CopyN(ioutil.Discard, r, 1) + if r.Len() != 2 { + t.Errorf("Len = %d; want 2", r.Len()) + } + if r.Size() != 3 { + t.Errorf("Size = %d; want 3", r.Size()) + } +} diff --git a/server-main.go b/server-main.go index 703be5267..adbdfde35 100644 --- a/server-main.go +++ b/server-main.go @@ -141,6 +141,14 @@ func initServerConfig(c *cli.Context) { fatalIf(err, "Unable to convert MINIO_MAXCONN=%s environment variable into its integer value.", maxConnStr) } + // Fetch max cache size from environment variable. + if maxCacheSizeStr := os.Getenv("MINIO_CACHE_SIZE"); maxCacheSizeStr != "" { + // We need to parse cache size to its integer value. + var err error + globalMaxCacheSize, err = strconvBytes(maxCacheSizeStr) + fatalIf(err, "Unable to convert MINIO_CACHE_SIZE=%s environment variable into its integer value.", maxCacheSizeStr) + } + // Fetch access keys from environment variables if any and update the config. accessKey := os.Getenv("MINIO_ACCESS_KEY") secretKey := os.Getenv("MINIO_SECRET_KEY") diff --git a/strconv-bytes.go b/strconv-bytes.go new file mode 100644 index 000000000..daf5df477 --- /dev/null +++ b/strconv-bytes.go @@ -0,0 +1,117 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "fmt" + "math" + "strconv" + "strings" + "unicode" +) + +// IEC Sizes, kibis of bits +const ( + Byte = 1 << (iota * 10) + KiByte + MiByte + GiByte + TiByte + PiByte + EiByte +) + +// Scientific notation Sizes. +const ( + IByte = 1 + KByte = IByte * 1000 + MByte = KByte * 1000 + GByte = MByte * 1000 + TByte = GByte * 1000 + PByte = TByte * 1000 + EByte = PByte * 1000 +) + +// This table represents both IEC and SI notations with their corresponding values. +var bytesSizeTable = map[string]uint64{ + "b": Byte, + "kib": KiByte, + "kb": KByte, + "mib": MiByte, + "mb": MByte, + "gib": GiByte, + "gb": GByte, + "tib": TiByte, + "tb": TByte, + "pib": PiByte, + "pb": PByte, + "eib": EiByte, + "eb": EByte, + // Without suffix + "": Byte, + "ki": KiByte, + "k": KByte, + "mi": MiByte, + "m": MByte, + "gi": GiByte, + "g": GByte, + "ti": TiByte, + "t": TByte, + "pi": PiByte, + "p": PByte, + "ei": EiByte, + "e": EByte, +} + +// strconvBytes parses a string representation of bytes into the number +// of bytes it represents. +// +// See Also: Bytes, IBytes. +// +// ParseBytes("42MB") -> 42000000, nil +// ParseBytes("42mib") -> 44040192, nil +func strconvBytes(s string) (uint64, error) { + lastDigit := 0 + // Calculates the final integer value. + for _, r := range s { + // This supports decimals as well. + if !(unicode.IsDigit(r) || r == '.') { + break + } + lastDigit++ + } + + // Float parsing to deal with decimal inputs. + f, err := strconv.ParseFloat(s[:lastDigit], 64) + if err != nil { + return 0, err + } + + // Fetch the corresponding byte size for notation. + byteSize := strings.ToLower(strings.TrimSpace(s[lastDigit:])) + size, ok := bytesSizeTable[byteSize] + if !ok { + return 0, fmt.Errorf("Unrecognized size notation name: %v", byteSize) + } + f *= float64(size) + // Return an error if final value overflows uint64 max. + if f >= math.MaxUint64 { + return 0, fmt.Errorf("too large: %v", s) + } + // Success. + return uint64(f), nil +} diff --git a/strconv-bytes_test.go b/strconv-bytes_test.go new file mode 100644 index 000000000..a855b46b3 --- /dev/null +++ b/strconv-bytes_test.go @@ -0,0 +1,112 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "testing" + +// Tests various variants in supporting all the byte conversions. +func TestByteConv(t *testing.T) { + // List of all tests for testing notation to corresponding + // byte conversions. + tests := []struct { + in string + exp uint64 + }{ + // Using IEC notation. + {"42", 42}, + {"42MB", 42000000}, + {"42MiB", 44040192}, + {"42mb", 42000000}, + {"42mib", 44040192}, + {"42MIB", 44040192}, + {"42 MB", 42000000}, + {"42 MiB", 44040192}, + {"42 mb", 42000000}, + {"42 mib", 44040192}, + {"42 MIB", 44040192}, + {"42.5MB", 42500000}, + {"42.5MiB", 44564480}, + {"42.5 MB", 42500000}, + {"42.5 MiB", 44564480}, + // Using SI notation. + {"42M", 42000000}, + {"42Mi", 44040192}, + {"42m", 42000000}, + {"42mi", 44040192}, + {"42MI", 44040192}, + {"42 M", 42000000}, + {"42 Mi", 44040192}, + {"42 m", 42000000}, + {"42 mi", 44040192}, + {"42 MI", 44040192}, + // With decimal values. + {"42.5M", 42500000}, + {"42.5Mi", 44564480}, + {"42.5 M", 42500000}, + {"42.5 Mi", 44564480}, + // With no more digits after '.' + {"42.M", 42000000}, + {"42.Mi", 44040192}, + {"42. m", 42000000}, + {"42. mi", 44040192}, + {"42. M", 42000000}, + {"42. Mi", 44040192}, + // Large testing, breaks when too much larger than this. + {"12.5 EB", uint64(12.5 * float64(EByte))}, + {"12.5 E", uint64(12.5 * float64(EByte))}, + {"12.5 EiB", uint64(12.5 * float64(EiByte))}, + } + + // Tests all notation variants. + for _, p := range tests { + got, err := strconvBytes(p.in) + if err != nil { + t.Errorf("Couldn't parse %v: %v", p.in, err) + } + if got != p.exp { + t.Errorf("Expected %v for %v, got %v", p.exp, p.in, got) + } + } +} + +// Validates different types of input errors. +func TestByteErrors(t *testing.T) { + // Input with integer and double space between notations. + got, err := strconvBytes("84 JB") + if err == nil { + t.Errorf("Expected error, got %v", got) + } + // Empty string. + got, err = strconvBytes("") + if err == nil { + t.Errorf("Expected error parsing nothing") + } + // Too large. + got, err = strconvBytes("16 EiB") + if err == nil { + t.Errorf("Expected error, got %v", got) + } +} + +// Add benchmarks here. + +// Benchmarks for bytes converter. +func BenchmarkParseBytes(b *testing.B) { + for i := 0; i < b.N; i++ { + strconvBytes("16.5 GB") + } +} diff --git a/xl-v1-errors.go b/xl-v1-errors.go new file mode 100644 index 000000000..b07c47dc2 --- /dev/null +++ b/xl-v1-errors.go @@ -0,0 +1,37 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "errors" + +// errXLMaxDisks - returned for reached maximum of disks. +var errXLMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'") + +// errXLMinDisks - returned for minimum number of disks. +var errXLMinDisks = errors.New("Minimum '6' disks are required to enable erasure code") + +// errXLNumDisks - returned for odd number of disks. +var errXLNumDisks = errors.New("Total number of disks should be multiples of '2'") + +// errXLReadQuorum - did not meet read quorum. +var errXLReadQuorum = errors.New("Read failed. Insufficient number of disks online") + +// errXLWriteQuorum - did not meet write quorum. +var errXLWriteQuorum = errors.New("Write failed. Insufficient number of disks online") + +// errXLDataCorrupt - err data corrupt. +var errXLDataCorrupt = errors.New("Data likely corrupted, read failed.") diff --git a/xl-v1-object.go b/xl-v1-object.go index dacc60fca..1e42cf091 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -27,6 +27,7 @@ import ( "time" "github.com/minio/minio/pkg/mimedb" + "github.com/minio/minio/pkg/objcache" ) /// Object Operations @@ -92,6 +93,45 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i eInfos = append(eInfos, metaArr[index].Erasure) } + // Save the writer. + mw := writer + + // Object cache enabled block. + if xl.objCacheEnabled { + // Validate if we have previous cache. + cachedBuffer, err := xl.objCache.Open(path.Join(bucket, object)) + if err == nil { // Cache hit. + // Advance the buffer to offset as if it was read. + if _, err = cachedBuffer.Seek(startOffset, 0); err != nil { // Seek to the offset. + return err + } + // Write the requested length. + if _, err = io.CopyN(writer, cachedBuffer, length); err != nil { + return err + } + return nil + } // Cache miss. + + // For unknown error, return and error out. + if err != objcache.ErrKeyNotFoundInCache { + return err + } // Cache has not been found, fill the cache. + + // Proceed to set the cache. + var newBuffer io.Writer + // Cache is only set if whole object is being read. + if startOffset == 0 && length == xlMeta.Stat.Size { + // Create a new entry in memory of length. + newBuffer, err = xl.objCache.Create(path.Join(bucket, object), length) + if err != nil { + // Perhaps cache is full, returns here. + return err + } + // Create a multi writer to write to both memory and client response. + mw = io.MultiWriter(newBuffer, writer) + } + } + totalBytesRead := int64(0) // Read from all parts. for ; partIndex <= lastPartIndex; partIndex++ { @@ -109,7 +149,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i } // Start reading the part name. - n, err := erasureReadFile(writer, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) + n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) if err != nil { return err } diff --git a/xl-v1.go b/xl-v1.go index a346bbdaf..d5b4993ac 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -17,11 +17,11 @@ package main import ( - "errors" "fmt" "sort" "github.com/minio/minio/pkg/disk" + "github.com/minio/minio/pkg/objcache" ) // XL constants. @@ -37,6 +37,15 @@ const ( // Uploads metadata file carries per multipart object metadata. uploadsJSONFile = "uploads.json" + + // 8GiB cache by default. + maxCacheSize = 8 * 1024 * 1024 * 1024 + + // Maximum erasure blocks. + maxErasureBlocks = 16 + + // Minimum erasure blocks. + minErasureBlocks = 6 ) // xlObjects - Implements XL object layer. @@ -48,34 +57,15 @@ type xlObjects struct { readQuorum int // readQuorum minimum required disks to read data. writeQuorum int // writeQuorum minimum required disks to write data. - // List pool management. + // ListObjects pool management. listPool *treeWalkPool -} - -// errXLMaxDisks - returned for reached maximum of disks. -var errXLMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'") - -// errXLMinDisks - returned for minimum number of disks. -var errXLMinDisks = errors.New("Number of disks are smaller than supported minimum count '8'") -// errXLNumDisks - returned for odd number of disks. -var errXLNumDisks = errors.New("Number of disks should be multiples of '2'") + // Object cache for caching objects. + objCache *objcache.Cache -// errXLReadQuorum - did not meet read quorum. -var errXLReadQuorum = errors.New("I/O error. did not meet read quorum.") - -// errXLWriteQuorum - did not meet write quorum. -var errXLWriteQuorum = errors.New("I/O error. did not meet write quorum.") - -// errXLDataCorrupt - err data corrupt. -var errXLDataCorrupt = errors.New("data likely corrupted, all blocks are zero in length") - -const ( - // Maximum erasure blocks. - maxErasureBlocks = 16 - // Minimum erasure blocks. - minErasureBlocks = 6 -) + // Object cache enabled. + objCacheEnabled bool +} // Validate if input disks are sufficient for initializing XL. func checkSufficientDisks(disks []string) error { @@ -174,7 +164,11 @@ func newXLObjects(disks []string) (ObjectLayer, error) { storageDisks: newPosixDisks, dataBlocks: dataBlocks, parityBlocks: parityBlocks, - listPool: newTreeWalkPool(globalLookupTimeout), + // Inititalize list pool. + listPool: newTreeWalkPool(globalLookupTimeout), + // Initialize object caching, FIXME: support auto cache expiration. + objCache: objcache.New(globalMaxCacheSize, objcache.NoExpiration), + objCacheEnabled: globalMaxCacheSize > 0, } // Figure out read and write quorum based on number of storage disks.