diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 5ccafbcce..de720791a 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -62,7 +62,7 @@ var _ = Suite(&MySuite{ var _ = Suite(&MySuite{ initDriver: func() (drivers.Driver, string) { - _, _, driver := memory.Start(1000) + _, _, driver := memory.Start(1000, 3*time.Hour) return driver, "" }, }) diff --git a/pkg/server/server.go b/pkg/server/server.go index 31726c1fd..4d3338775 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -27,6 +27,7 @@ import ( "github.com/minio-io/minio/pkg/storage/drivers/memory" "github.com/minio-io/minio/pkg/utils/log" "reflect" + "time" ) // MemoryFactory is used to build memory api servers @@ -38,7 +39,7 @@ type MemoryFactory struct { // GetStartServerFunc builds memory api servers func (f MemoryFactory) GetStartServerFunc() StartServerFunc { return func() (chan<- string, <-chan error) { - _, _, driver := memory.Start(f.MaxMemory) + _, _, driver := memory.Start(f.MaxMemory, 3*time.Hour) ctrl, status, _ := httpserver.Start(api.HTTPHandler(f.Domain, driver), f.Config) return ctrl, status } diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 489e901aa..89b4768a3 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -48,6 +48,8 @@ type memoryDriver struct { lock *sync.RWMutex totalSize uint64 maxSize uint64 + expiration time.Duration + shutdown bool } type storedBucket struct { @@ -65,7 +67,7 @@ const ( ) // Start memory object server -func Start(maxSize uint64) (chan<- string, <-chan error, drivers.Driver) { +func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan error, drivers.Driver) { ctrlChannel := make(chan string) errorChannel := make(chan error) @@ -75,6 +77,8 @@ func Start(maxSize uint64) (chan<- string, <-chan error, drivers.Driver) { memory.objectMetadata = make(map[string]storedObject) memory.objects = lru.New(0) memory.lock = new(sync.RWMutex) + memory.expiration = expiration + memory.shutdown = false switch { case maxSize == 0: @@ -87,6 +91,9 @@ func Start(maxSize uint64) (chan<- string, <-chan error, drivers.Driver) { memory.objects.OnEvicted = memory.evictObject + // set up memory expiration + go memory.expireObjects() + go start(ctrlChannel, errorChannel) return ctrlChannel, errorChannel, memory } @@ -470,3 +477,41 @@ func (memory *memoryDriver) evictObject(key lru.Key, value interface{}) { log.Println("evicting:", k) delete(memory.objectMetadata, k) } + +func (memory *memoryDriver) expireObjects() { + for { + if memory.shutdown { + return + } + var keysToRemove []string + memory.lock.RLock() + var earliest time.Time + empty := true + for key, object := range memory.objectMetadata { + if empty { + empty = false + } + if time.Now().Add(-memory.expiration).After(object.metadata.Created) { + keysToRemove = append(keysToRemove, key) + } else { + if object.metadata.Created.Before(earliest) { + earliest = object.metadata.Created + } + } + } + memory.lock.RUnlock() + memory.lock.Lock() + for _, key := range keysToRemove { + memory.objects.Remove(key) + } + memory.lock.Unlock() + if empty { + time.Sleep(memory.expiration) + } else { + sleepFor := earliest.Sub(time.Now()) + if sleepFor > 0 { + time.Sleep(sleepFor) + } + } + } +} diff --git a/pkg/storage/drivers/memory/memory_test.go b/pkg/storage/drivers/memory/memory_test.go index bd79e0dd9..ed53a0577 100644 --- a/pkg/storage/drivers/memory/memory_test.go +++ b/pkg/storage/drivers/memory/memory_test.go @@ -21,6 +21,7 @@ import ( . "github.com/minio-io/check" "github.com/minio-io/minio/pkg/storage/drivers" + "time" ) func Test(t *testing.T) { TestingT(t) } @@ -31,7 +32,7 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { create := func() drivers.Driver { - _, _, store := Start(1000) + _, _, store := Start(1000, 3*time.Hour) return store } drivers.APITestSuite(c, create)