server: Add global namespace lock. (#1398)

Fixes #1393
master
Harshavardhana 9 years ago committed by Anand Babu (AB) Periasamy
parent 8deddb82f4
commit 984903cce1
  1. 3
      main.go
  2. 128
      namespace-lock.go
  3. 4
      object-api_test.go
  4. 3
      server_test.go
  5. 10
      xl-v1-createfile.go
  6. 7
      xl-v1-healfile.go
  7. 71
      xl-v1-namespace.go
  8. 9
      xl-v1-readfile.go
  9. 52
      xl-v1.go

@ -202,6 +202,9 @@ func main() {
// Enable all loggers by now. // Enable all loggers by now.
enableLoggers() enableLoggers()
// Initialize name space lock.
initNSLock()
// Do not print update messages, if quiet flag is set. // Do not print update messages, if quiet flag is set.
if !globalQuiet { if !globalQuiet {
// Do not print any errors in release update function. // Do not print any errors in release update function.

@ -0,0 +1,128 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "sync"
// nsParam - carries name space resource.
type nsParam struct {
volume string
path string
}
// nsLock - provides primitives for locking critical namespace regions.
type nsLock struct {
*sync.RWMutex
ref uint
}
// nsLockMap - namespace lock map, provides primitives to Lock,
// Unlock, RLock and RUnlock.
type nsLockMap struct {
lockMap map[nsParam]*nsLock
mutex *sync.Mutex
}
// Global name space lock.
var nsMutex *nsLockMap
// initNSLock - initialize name space lock map.
func initNSLock() {
nsMutex = &nsLockMap{
lockMap: make(map[nsParam]*nsLock),
mutex: &sync.Mutex{},
}
}
// Lock - locks the given resource for writes, using a previously
// allocated name space lock or initializing a new one.
func (n *nsLockMap) Lock(volume, path string) {
n.mutex.Lock()
defer n.mutex.Unlock()
param := nsParam{volume, path}
nsLk, found := n.lockMap[param]
if !found {
nsLk = &nsLock{
RWMutex: &sync.RWMutex{},
ref: 0,
}
}
// Acquire a write lock and update reference counter.
nsLk.Lock()
nsLk.ref++
n.lockMap[param] = nsLk
}
// Unlock - unlocks any previously acquired write locks.
func (n *nsLockMap) Unlock(volume, path string) {
n.mutex.Lock()
defer n.mutex.Unlock()
param := nsParam{volume, path}
if nsLk, found := n.lockMap[param]; found {
// Unlock a write lock and update reference counter.
nsLk.Unlock()
if nsLk.ref != 0 {
nsLk.ref--
}
if nsLk.ref != 0 {
n.lockMap[param] = nsLk
}
}
}
// RLock - locks any previously acquired read locks.
func (n *nsLockMap) RLock(volume, path string) {
n.mutex.Lock()
defer n.mutex.Unlock()
param := nsParam{volume, path}
nsLk, found := n.lockMap[param]
if !found {
nsLk = &nsLock{
RWMutex: &sync.RWMutex{},
ref: 0,
}
}
// Acquire a read lock and update reference counter.
nsLk.RLock()
nsLk.ref++
n.lockMap[param] = nsLk
}
// RUnlock - unlocks any previously acquired read locks.
func (n *nsLockMap) RUnlock(volume, path string) {
n.mutex.Lock()
defer n.mutex.Unlock()
param := nsParam{volume, path}
if nsLk, found := n.lockMap[param]; found {
// Unlock a read lock and update reference counter.
nsLk.RUnlock()
if nsLk.ref != 0 {
nsLk.ref--
}
if nsLk.ref != 0 {
n.lockMap[param] = nsLk
}
}
}

@ -44,6 +44,10 @@ func (s *MySuite) TestFSAPISuite(c *C) {
func (s *MySuite) TestXLAPISuite(c *C) { func (s *MySuite) TestXLAPISuite(c *C) {
var storageList []string var storageList []string
// Initialize name space lock.
initNSLock()
create := func() objectAPI { create := func() objectAPI {
var nDisks = 16 // Maximum disks. var nDisks = 16 // Maximum disks.
var erasureDisks []string var erasureDisks []string

@ -81,6 +81,9 @@ func (s *MyAPISuite) SetUpSuite(c *C) {
// Initialize server config. // Initialize server config.
initConfig() initConfig()
// Initialize name space lock.
initNSLock()
// Set port. // Set port.
addr := ":" + strconv.Itoa(getFreePort()) addr := ":" + strconv.Itoa(getFreePort())

@ -58,10 +58,9 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
defer wcloser.release() defer wcloser.release()
// Lock right before reading from disk. // Lock right before reading from disk.
readLock := true nsMutex.RLock(volume, path)
xl.lockNS(volume, path, readLock)
partsMetadata, errs := xl.getPartsMetadata(volume, path) partsMetadata, errs := xl.getPartsMetadata(volume, path)
xl.unlockNS(volume, path, readLock) nsMutex.RUnlock(volume, path)
// Count errors other than fileNotFound, bigger than the allowed // Count errors other than fileNotFound, bigger than the allowed
// readQuorum, if yes throw an error. // readQuorum, if yes throw an error.
@ -263,9 +262,8 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
} }
// Lock right before commit to disk. // Lock right before commit to disk.
readLock = false // false means writeLock. nsMutex.Lock(volume, path)
xl.lockNS(volume, path, readLock) defer nsMutex.Unlock(volume, path)
defer xl.unlockNS(volume, path, readLock)
// Close all writers and metadata writers in routines. // Close all writers and metadata writers in routines.
for index, writer := range writers { for index, writer := range writers {

@ -33,11 +33,10 @@ func (xl XL) healFile(volume string, path string) error {
var writers = make([]io.WriteCloser, totalBlocks) var writers = make([]io.WriteCloser, totalBlocks)
// Acquire a read lock. // Acquire a read lock.
readLock := true nsMutex.RLock(volume, path)
xl.lockNS(volume, path, readLock) defer nsMutex.RUnlock(volume, path)
defer xl.unlockNS(volume, path, readLock)
// Fetch all online disks. // List all online disks to verify if we need to heal.
onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path) onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{

@ -1,71 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "sync"
// nameSpaceParam - carries name space resource.
type nameSpaceParam struct {
volume string
path string
}
// nameSpaceLock - provides primitives for locking critical namespace regions.
type nameSpaceLock struct {
rwMutex *sync.RWMutex
count uint
}
func (nsLock *nameSpaceLock) InUse() bool {
return nsLock.count != 0
}
// Lock acquires write lock and increments the namespace counter.
func (nsLock *nameSpaceLock) Lock() {
nsLock.rwMutex.Lock()
nsLock.count++
}
// Unlock releases write lock and decrements the namespace counter.
func (nsLock *nameSpaceLock) Unlock() {
nsLock.rwMutex.Unlock()
if nsLock.count != 0 {
nsLock.count--
}
}
// RLock acquires read lock and increments the namespace counter.
func (nsLock *nameSpaceLock) RLock() {
nsLock.rwMutex.RLock()
nsLock.count++
}
// RUnlock release read lock and decrements the namespace counter.
func (nsLock *nameSpaceLock) RUnlock() {
nsLock.rwMutex.RUnlock()
if nsLock.count != 0 {
nsLock.count--
}
}
// newNSLock - provides a new instance of namespace locking primitives.
func newNSLock() *nameSpaceLock {
return &nameSpaceLock{
rwMutex: &sync.RWMutex{},
count: 0,
}
}

@ -36,10 +36,9 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
} }
// Acquire a read lock. // Acquire a read lock.
readLock := true nsMutex.RLock(volume, path)
xl.lockNS(volume, path, readLock)
onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path) onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path)
xl.unlockNS(volume, path, readLock) nsMutex.RUnlock(volume, path)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"volume": volume, "volume": volume,
@ -63,7 +62,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
} }
// Acquire read lock again. // Acquire read lock again.
xl.lockNS(volume, path, readLock) nsMutex.RLock(volume, path)
readers := make([]io.ReadCloser, len(xl.storageDisks)) readers := make([]io.ReadCloser, len(xl.storageDisks))
for index, disk := range onlineDisks { for index, disk := range onlineDisks {
if disk == nil { if disk == nil {
@ -77,7 +76,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
readers[index] = reader readers[index] = reader
} }
} }
xl.unlockNS(volume, path, readLock) nsMutex.RUnlock(volume, path)
// Initialize pipe. // Initialize pipe.
pipeReader, pipeWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()

@ -22,7 +22,6 @@ import (
slashpath "path" slashpath "path"
"sort" "sort"
"strings" "strings"
"sync"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/klauspost/reedsolomon" "github.com/klauspost/reedsolomon"
@ -41,52 +40,10 @@ type XL struct {
DataBlocks int DataBlocks int
ParityBlocks int ParityBlocks int
storageDisks []StorageAPI storageDisks []StorageAPI
nameSpaceLockMap map[nameSpaceParam]*nameSpaceLock
nameSpaceLockMapMutex *sync.Mutex
readQuorum int readQuorum int
writeQuorum int writeQuorum int
} }
// lockNS - locks the given resource, using a previously allocated
// name space lock or initializing a new one.
func (xl XL) lockNS(volume, path string, readLock bool) {
xl.nameSpaceLockMapMutex.Lock()
defer xl.nameSpaceLockMapMutex.Unlock()
param := nameSpaceParam{volume, path}
nsLock, found := xl.nameSpaceLockMap[param]
if !found {
nsLock = newNSLock()
}
if readLock {
nsLock.RLock()
} else {
nsLock.Lock()
}
xl.nameSpaceLockMap[param] = nsLock
}
// unlockNS - unlocks any previously acquired read or write locks.
func (xl XL) unlockNS(volume, path string, readLock bool) {
xl.nameSpaceLockMapMutex.Lock()
defer xl.nameSpaceLockMapMutex.Unlock()
param := nameSpaceParam{volume, path}
if nsLock, found := xl.nameSpaceLockMap[param]; found {
if readLock {
nsLock.RUnlock()
} else {
nsLock.Unlock()
}
if nsLock.InUse() {
xl.nameSpaceLockMap[param] = nsLock
}
}
}
// newXL instantiate a new XL. // newXL instantiate a new XL.
func newXL(disks ...string) (StorageAPI, error) { func newXL(disks ...string) (StorageAPI, error) {
// Initialize XL. // Initialize XL.
@ -135,10 +92,6 @@ func newXL(disks ...string) (StorageAPI, error) {
// Save all the initialized storage disks. // Save all the initialized storage disks.
xl.storageDisks = storageDisks xl.storageDisks = storageDisks
// Initialize name space lock map.
xl.nameSpaceLockMap = make(map[nameSpaceParam]*nameSpaceLock)
xl.nameSpaceLockMapMutex = &sync.Mutex{}
// Figure out read and write quorum based on number of storage disks. // Figure out read and write quorum based on number of storage disks.
// Read quorum should be always N/2 + 1 (due to Vandermonde matrix // Read quorum should be always N/2 + 1 (due to Vandermonde matrix
// erasure requirements) // erasure requirements)
@ -552,10 +505,9 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
} }
// Acquire read lock. // Acquire read lock.
readLock := true nsMutex.RLock(volume, path)
xl.lockNS(volume, path, readLock)
_, metadata, heal, err := xl.listOnlineDisks(volume, path) _, metadata, heal, err := xl.listOnlineDisks(volume, path)
xl.unlockNS(volume, path, readLock) nsMutex.RUnlock(volume, path)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"volume": volume, "volume": volume,

Loading…
Cancel
Save