|
|
|
@ -31,24 +31,16 @@ type fsIOPool struct { |
|
|
|
|
readersMap map[string]*lock.RLockedFile |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Open is a wrapper call to read locked file which
|
|
|
|
|
// returns a ReadAtCloser.
|
|
|
|
|
//
|
|
|
|
|
// ReaderAt is provided so that the fd is non seekable, since
|
|
|
|
|
// we are sharing fd's with concurrent threads, we don't want
|
|
|
|
|
// all readers to change offsets on each other during such
|
|
|
|
|
// concurrent operations. Using ReadAt allows us to read from
|
|
|
|
|
// any offsets.
|
|
|
|
|
// lookupToRead - looks up an fd from readers map and
|
|
|
|
|
// returns read locked fd for caller to read from, if
|
|
|
|
|
// fd found increments the reference count. If the fd
|
|
|
|
|
// is found to be closed then purges it from the
|
|
|
|
|
// readersMap and returns nil instead.
|
|
|
|
|
//
|
|
|
|
|
// Closer is implemented to track total readers and to close
|
|
|
|
|
// only when there no more readers, the fd is purged if the lock
|
|
|
|
|
// count has reached zero.
|
|
|
|
|
func (fsi *fsIOPool) Open(path string) (*lock.RLockedFile, error) { |
|
|
|
|
if err := checkPathLength(path); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fsi.Lock() |
|
|
|
|
// NOTE: this function is not protected and it is callers
|
|
|
|
|
// responsibility to lock this call to be thread safe. For
|
|
|
|
|
// implementation ideas look at the usage inside Open() call.
|
|
|
|
|
func (fsi *fsIOPool) lookupToRead(path string) (*lock.RLockedFile, bool) { |
|
|
|
|
rlkFile, ok := fsi.readersMap[path] |
|
|
|
|
// File reference exists on map, validate if its
|
|
|
|
|
// really closed and we are safe to purge it.
|
|
|
|
@ -69,15 +61,33 @@ func (fsi *fsIOPool) Open(path string) (*lock.RLockedFile, error) { |
|
|
|
|
rlkFile.IncLockRef() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
fsi.Unlock() |
|
|
|
|
return rlkFile, ok |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Locked path reference doesn't exist, freshly open the file in
|
|
|
|
|
// read lock mode.
|
|
|
|
|
// Open is a wrapper call to read locked file which
|
|
|
|
|
// returns a ReadAtCloser.
|
|
|
|
|
//
|
|
|
|
|
// ReaderAt is provided so that the fd is non seekable, since
|
|
|
|
|
// we are sharing fd's with concurrent threads, we don't want
|
|
|
|
|
// all readers to change offsets on each other during such
|
|
|
|
|
// concurrent operations. Using ReadAt allows us to read from
|
|
|
|
|
// any offsets.
|
|
|
|
|
//
|
|
|
|
|
// Closer is implemented to track total readers and to close
|
|
|
|
|
// only when there no more readers, the fd is purged if the lock
|
|
|
|
|
// count has reached zero.
|
|
|
|
|
func (fsi *fsIOPool) Open(path string) (*lock.RLockedFile, error) { |
|
|
|
|
if err := checkPathLength(path); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fsi.Lock() |
|
|
|
|
rlkFile, ok := fsi.lookupToRead(path) |
|
|
|
|
fsi.Unlock() |
|
|
|
|
// Locked path reference doesn't exist, acquire a read lock again on the file.
|
|
|
|
|
if !ok { |
|
|
|
|
var err error |
|
|
|
|
var newRlkFile *lock.RLockedFile |
|
|
|
|
// Open file for reading.
|
|
|
|
|
newRlkFile, err = lock.RLockedOpenFile(path) |
|
|
|
|
// Open file for reading with read lock.
|
|
|
|
|
newRlkFile, err := lock.RLockedOpenFile(path) |
|
|
|
|
if err != nil { |
|
|
|
|
if os.IsNotExist(err) { |
|
|
|
|
return nil, errFileNotFound |
|
|
|
@ -93,34 +103,29 @@ func (fsi *fsIOPool) Open(path string) (*lock.RLockedFile, error) { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Save new reader on the map.
|
|
|
|
|
fsi.Lock() |
|
|
|
|
rlkFile, ok = fsi.readersMap[path] |
|
|
|
|
if ok && rlkFile != nil { |
|
|
|
|
// If the file is closed and not removed from map is a bug.
|
|
|
|
|
if rlkFile.IsClosed() { |
|
|
|
|
// Log this as an error.
|
|
|
|
|
errorIf(errUnexpected, "Unexpected entry found on the map %s", path) |
|
|
|
|
/// Save new reader on the map.
|
|
|
|
|
|
|
|
|
|
// Purge the cached lock path from map.
|
|
|
|
|
delete(fsi.readersMap, path) |
|
|
|
|
|
|
|
|
|
// Save the newly acquired read locked file.
|
|
|
|
|
rlkFile = newRlkFile |
|
|
|
|
} else { |
|
|
|
|
// Increment the lock ref, since the file is not closed yet
|
|
|
|
|
// and caller requested to read the file again.
|
|
|
|
|
rlkFile.IncLockRef() |
|
|
|
|
// It is possible by this time due to concurrent
|
|
|
|
|
// i/o we might have another lock present. Lookup
|
|
|
|
|
// again to check for such a possibility. If no such
|
|
|
|
|
// file exists save the newly opened fd, if not
|
|
|
|
|
// reuse the existing fd and close the newly opened
|
|
|
|
|
// file
|
|
|
|
|
fsi.Lock() |
|
|
|
|
rlkFile, ok = fsi.lookupToRead(path) |
|
|
|
|
if ok { |
|
|
|
|
// Close the new fd, since we already seem to have
|
|
|
|
|
// an active reference.
|
|
|
|
|
newRlkFile.Close() |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
// Save the newly acquired read locked file.
|
|
|
|
|
// Save the new rlk file.
|
|
|
|
|
rlkFile = newRlkFile |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Save the rlkFile back on the map.
|
|
|
|
|
// Save the new fd on the map.
|
|
|
|
|
fsi.readersMap[path] = rlkFile |
|
|
|
|
fsi.Unlock() |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Success.
|
|
|
|
|