lock: improve locker initialization at init (#8776)

Use reference format to initialize lockers
during startup, also handle `nil` for NetLocker
in dsync and remove *errorLocker* implementation

Add further tuning parameters such as

 - DialTimeout is now 15 seconds from 30 seconds
 - KeepAliveTimeout is not 20 seconds, 5 seconds
   more than default 15 seconds
 - ResponseHeaderTimeout to 10 seconds
 - ExpectContinueTimeout is reduced to 3 seconds
 - DualStack is enabled by default remove setting
   it to `true`
 - Reduce IdleConnTimeout to 30 seconds from
   1 minute to avoid idleConn build up

Fixes #8773
master
Harshavardhana 5 years ago committed by GitHub
parent 0a70bc24ac
commit 5aa5dcdc6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      .travis.yml
  2. 6
      Dockerfile.simpleci
  3. 12
      Makefile
  4. 5
      buildscripts/go-coverage.sh
  5. 7
      buildscripts/race.sh
  6. 197
      buildscripts/verify-healing.sh
  7. 25
      cmd/admin-heal-ops.go
  8. 1
      cmd/background-heal-ops.go
  9. 8
      cmd/background-newdisks-heal-ops.go
  10. 25
      cmd/config-encrypted.go
  11. 10
      cmd/endpoint.go
  12. 35
      cmd/local-locker.go
  13. 4
      cmd/lock-rest-client.go
  14. 6
      cmd/object-api-multipart_test.go
  15. 3
      cmd/peer-rest-server.go
  16. 2
      cmd/posix.go
  17. 19
      cmd/server-main.go
  18. 16
      cmd/utils.go
  19. 54
      cmd/xl-sets.go
  20. 32
      cmd/xl-v1.go
  21. 67
      cmd/xl-v1_test.go
  22. 13
      pkg/dsync/drwmutex.go

@ -32,8 +32,7 @@ matrix:
- make
- diff -au <(gofmt -s -d cmd) <(printf "")
- diff -au <(gofmt -s -d pkg) <(printf "")
- for d in $(go list ./... | grep -v browser); do CGO_ENABLED=1 go test -v -race --timeout 20m "$d"; done
- make verifiers
- make test-race
- make crosscompile
- make verify
- cd browser && npm install && npm run test && cd ..

@ -34,11 +34,13 @@ USER ci
RUN make
RUN bash -c 'diff -au <(gofmt -s -d cmd) <(printf "")'
RUN bash -c 'diff -au <(gofmt -s -d pkg) <(printf "")'
RUN for d in $(go list ./... | grep -v browser); do go test -v -race --timeout 20m "$d"; done
RUN make verifiers
RUN make test-race
RUN make crosscompile
RUN make verify
## -- add healing tests
RUN make verify-healing
#-------------------------------------------------------------
# Stage 2: Test Frontend
#-------------------------------------------------------------

@ -64,15 +64,21 @@ test: verifiers build
@echo "Running unit tests"
@GO111MODULE=on CGO_ENABLED=0 go test -tags kqueue ./... 1>/dev/null
test-race: verifiers build
@echo "Running unit tests under -race"
@(env bash $(PWD)/buildscripts/race.sh)
# Verify minio binary
verify:
@echo "Verifying build with race"
@GO111MODULE=on CGO_ENABLED=1 go build -race -tags kqueue --ldflags $(BUILD_LDFLAGS) -o $(PWD)/minio 1>/dev/null
@(env bash $(PWD)/buildscripts/verify-build.sh)
coverage: build
@echo "Running all coverage for minio"
@(env bash $(PWD)/buildscripts/go-coverage.sh)
# Verify healing of disks with minio binary
verify-healing:
@echo "Verify healing build with race"
@GO111MODULE=on CGO_ENABLED=1 go build -race -tags kqueue --ldflags $(BUILD_LDFLAGS) -o $(PWD)/minio 1>/dev/null
@(env bash $(PWD)/buildscripts/verify-healing.sh)
# Builds minio locally.
build: checks

@ -1,5 +0,0 @@
#!/usr/bin/env bash
set -e
GO111MODULE=on CGO_ENABLED=0 go test -v -coverprofile=coverage.txt -covermode=atomic ./...

@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -e
for d in $(go list ./... | grep -v browser); do
CGO_ENABLED=1 go test -v -race --timeout 20m "$d"
done

@ -0,0 +1,197 @@
#!/bin/bash
#
# MinIO Cloud Storage, (C) 2020 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -e
set -E
set -o pipefail
if [ ! -x "$PWD/minio" ]; then
echo "minio executable binary not found in current directory"
exit 1
fi
WORK_DIR="$PWD/.verify-$RANDOM"
MINIO_CONFIG_DIR="$WORK_DIR/.minio"
MINIO=( "$PWD/minio" --config-dir "$MINIO_CONFIG_DIR" )
function start_minio_3_node() {
declare -a minio_pids
export MINIO_ACCESS_KEY=minio
export MINIO_SECRET_KEY=minio123
for i in $(seq 1 3); do
ARGS+=("http://127.0.0.1:$[9000+$i]${WORK_DIR}/$i/1/ http://127.0.0.1:$[9000+$i]${WORK_DIR}/$i/2/ http://127.0.0.1:$[9000+$i]${WORK_DIR}/$i/3/ http://127.0.0.1:$[9000+$i]${WORK_DIR}/$i/4/ http://127.0.0.1:$[9000+$i]${WORK_DIR}/$i/5/ http://127.0.0.1:$[9000+$i]${WORK_DIR}/$i/6/")
done
"${MINIO[@]}" server --address ":9001" ${ARGS[@]} > "${WORK_DIR}/dist-minio-9001.log" 2>&1 &
minio_pids[0]=$!
"${MINIO[@]}" server --address ":9002" ${ARGS[@]} > "${WORK_DIR}/dist-minio-9002.log" 2>&1 &
minio_pids[1]=$!
"${MINIO[@]}" server --address ":9003" ${ARGS[@]} > "${WORK_DIR}/dist-minio-9003.log" 2>&1 &
minio_pids[2]=$!
sleep "$1"
echo "${minio_pids[@]}"
}
function check_online() {
for i in $(seq 1 3); do
if grep -q 'Server switching to safe mode' ${WORK_DIR}/dist-minio-$[9000+$i].log; then
echo "1"
fi
done
}
function purge()
{
rm -rf "$1"
}
function __init__()
{
echo "Initializing environment"
mkdir -p "$WORK_DIR"
mkdir -p "$MINIO_CONFIG_DIR"
## version is purposefully set to '3' for minio to migrate configuration file
echo '{"version": "3", "credential": {"accessKey": "minio", "secretKey": "minio123"}, "region": "us-east-1"}' > "$MINIO_CONFIG_DIR/config.json"
}
function perform_test_1() {
minio_pids=( $(start_minio_3_node 20) )
for pid in "${minio_pids[@]}"; do
kill "$pid"
done
echo "Testing in Distributed Erasure setup healing test case 1"
echo "Remove the contents of the disks belonging to '2' erasure set"
rm -rf ${WORK_DIR}/2/*
minio_pids=( $(start_minio_3_node 60) )
for pid in "${minio_pids[@]}"; do
if ! kill "$pid"; then
cat "${WORK_DIR}/*.log"
echo "FAILED"
purge "$WORK_DIR"
exit 1
fi
done
rv=$(check_online)
if [ "$rv" == "1" ]; then
for pid in "${minio_pids[@]}"; do
kill -9 "$pid"
done
for i in $(seq 1 3); do
echo "server$i log:"
cat "${WORK_DIR}/dist-minio-$[9000+$i].log"
done
echo "FAILED"
purge "$WORK_DIR"
exit 1
fi
}
function perform_test_2() {
minio_pids=( $(start_minio_3_node 20) )
for pid in "${minio_pids[@]}"; do
kill "$pid"
done
echo "Testing in Distributed Erasure setup healing test case 2"
echo "Remove the contents of the disks belonging to '1' erasure set"
rm -rf ${WORK_DIR}/1/*/
minio_pids=( $(start_minio_3_node 60) )
for pid in "${minio_pids[@]}"; do
if ! kill "$pid"; then
cat "${WORK_DIR}/*.log"
echo "FAILED"
purge "$WORK_DIR"
exit 1
fi
done
rv=$(check_online)
if [ "$rv" == "1" ]; then
for pid in "${minio_pids[@]}"; do
kill -9 "$pid"
done
for i in $(seq 1 3); do
echo "server$i log:"
cat "${WORK_DIR}/dist-minio-$[9000+$i].log"
done
echo "FAILED"
purge "$WORK_DIR"
exit 1
fi
}
function perform_test_3() {
minio_pids=( $(start_minio_3_node 20) )
for pid in "${minio_pids[@]}"; do
kill "$pid"
done
echo "Testing in Distributed Erasure setup healing test case 3"
echo "Remove the contents of the disks belonging to '3' erasure set"
rm -rf ${WORK_DIR}/3/*/
minio_pids=( $(start_minio_3_node 60) )
for pid in "${minio_pids[@]}"; do
if ! kill "$pid"; then
cat "${WORK_DIR}/*.log"
echo "FAILED"
purge "$WORK_DIR"
exit 1
fi
done
rv=$(check_online)
if [ "$rv" == "1" ]; then
for pid in "${minio_pids[@]}"; do
kill -9 "$pid"
done
for i in $(seq 1 3); do
echo "server$i log:"
cat "${WORK_DIR}/dist-minio-$[9000+$i].log"
done
echo "FAILED"
purge "$WORK_DIR"
exit 1
fi
}
function main()
{
perform_test_1
perform_test_2
perform_test_3
}
( __init__ "$@" && main "$@" )
rv=$?
purge "$WORK_DIR"
exit "$rv"

@ -577,23 +577,13 @@ func (h *healSequence) queueHealTask(path string, healType madmin.HealItemType)
func (h *healSequence) healItemsFromSourceCh() error {
h.lastHealActivity = UTCNow()
// Start healing the config prefix.
if err := h.healMinioSysMeta(minioConfigPrefix)(); err != nil {
logger.LogIf(h.ctx, err)
}
// Start healing the bucket config prefix.
if err := h.healMinioSysMeta(bucketConfigPrefix)(); err != nil {
logger.LogIf(h.ctx, err)
}
// Start healing the background ops prefix.
if err := h.healMinioSysMeta(backgroundOpsMetaPrefix)(); err != nil {
if err := h.healItems(); err != nil {
logger.LogIf(h.ctx, err)
}
for path := range h.sourceCh {
for {
select {
case path := <-h.sourceCh:
var itemType madmin.HealItemType
switch {
case path == nopHeal:
@ -612,9 +602,12 @@ func (h *healSequence) healItemsFromSourceCh() error {
h.scannedItemsCount++
h.lastHealActivity = UTCNow()
}
case <-h.traverseAndHealDoneCh:
return nil
case <-GlobalServiceDoneCh:
return nil
}
}
}
func (h *healSequence) healFromSourceCh() {

@ -61,6 +61,7 @@ func (h *healRoutine) run() {
if !ok {
break
}
if httpServer := newHTTPServerFn(); httpServer != nil {
// Wait at max 10 minute for an inprogress request before proceeding to heal
waitCount := 600

@ -62,10 +62,9 @@ func monitorLocalDisksAndHeal() {
time.Sleep(time.Second)
}
// Perform automatic disk healing when a new one is inserted
// Perform automatic disk healing when a disk is replaced locally.
for {
time.Sleep(defaultMonitorNewDiskInterval)
// Attempt a heal as the server starts-up first.
localDisksInZoneHeal := make([]Endpoints, len(z.zones))
for i, ep := range globalEndpoints {
localDisksToHeal := Endpoints{}
@ -88,9 +87,12 @@ func monitorLocalDisksAndHeal() {
// Reformat disks
bgSeq.sourceCh <- SlashSeparator
// Ensure that reformatting disks is finished
bgSeq.sourceCh <- nopHeal
time.Sleep(defaultMonitorNewDiskInterval)
var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal))
// Compute the list of erasure set to heal
for i, localDisksToHeal := range localDisksInZoneHeal {

@ -84,7 +84,7 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error {
return nil
}
if !globalActiveCred.IsValid() {
return errInvalidArgument
return config.ErrMissingCredentialsBackendEncrypted(nil)
}
}
@ -267,7 +267,12 @@ func migrateIAMConfigsEtcdToEncrypted(client *etcd.Client) error {
}
if !utf8.Valid(data) {
return errors.New("config data not in plain-text form")
_, err = decryptData(data, globalActiveCred)
if err == nil {
// Config is already encrypted with right keys
continue
}
return errors.New("config data not in plain-text form or encrypted")
}
cencdata, err = madmin.EncryptData(globalActiveCred.String(), data)
@ -279,9 +284,11 @@ func migrateIAMConfigsEtcdToEncrypted(client *etcd.Client) error {
return err
}
}
if encrypted && globalActiveCred.IsValid() {
if encrypted && globalActiveCred.IsValid() && activeCredOld.IsValid() {
logger.Info("Rotation complete, please make sure to unset MINIO_ACCESS_KEY_OLD and MINIO_SECRET_KEY_OLD envs")
}
return saveKeyEtcd(ctx, client, backendEncryptedFile, backendEncryptedMigrationComplete)
}
@ -309,7 +316,8 @@ func migrateConfigPrefixToEncrypted(objAPI ObjectLayer, activeCredOld auth.Crede
var marker string
for {
res, err := objAPI.ListObjects(context.Background(), minioMetaBucket, minioConfigPrefix, marker, "", maxObjectList)
res, err := objAPI.ListObjects(context.Background(), minioMetaBucket,
minioConfigPrefix, marker, "", maxObjectList)
if err != nil {
return err
}
@ -339,7 +347,12 @@ func migrateConfigPrefixToEncrypted(objAPI ObjectLayer, activeCredOld auth.Crede
}
if !utf8.Valid(data) {
return errors.New("config data not in plain-text form")
_, err = decryptData(data, globalActiveCred)
if err == nil {
// Config is already encrypted with right keys
continue
}
return errors.New("config data not in plain-text form or encrypted")
}
cencdata, err = madmin.EncryptData(globalActiveCred.String(), data)
@ -359,7 +372,7 @@ func migrateConfigPrefixToEncrypted(objAPI ObjectLayer, activeCredOld auth.Crede
marker = res.NextMarker
}
if encrypted && globalActiveCred.IsValid() {
if encrypted && globalActiveCred.IsValid() && activeCredOld.IsValid() {
logger.Info("Rotation complete, please make sure to unset MINIO_ACCESS_KEY_OLD and MINIO_SECRET_KEY_OLD envs")
}

@ -54,7 +54,6 @@ const (
type Endpoint struct {
*url.URL
IsLocal bool
SetIndex int
}
func (endpoint Endpoint) String() string {
@ -535,9 +534,8 @@ func CreateEndpoints(serverAddr string, foundLocal bool, args ...[]string) (Endp
return endpoints, setupType, nil
}
for i, iargs := range args {
for _, iargs := range args {
// Convert args to endpoints
var newEndpoints Endpoints
eps, err := NewEndpoints(iargs...)
if err != nil {
return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
@ -548,11 +546,7 @@ func CreateEndpoints(serverAddr string, foundLocal bool, args ...[]string) (Endp
return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
}
for _, ep := range eps {
ep.SetIndex = i
newEndpoints = append(newEndpoints, ep)
}
endpoints = append(endpoints, newEndpoints...)
endpoints = append(endpoints, eps...)
}
if len(endpoints) == 0 {

@ -17,7 +17,6 @@
package cmd
import (
"errors"
"fmt"
"sync"
"time"
@ -39,40 +38,6 @@ func isWriteLock(lri []lockRequesterInfo) bool {
return len(lri) == 1 && lri[0].Writer
}
type errorLocker struct{}
func (d *errorLocker) String() string {
return ""
}
func (d *errorLocker) Lock(args dsync.LockArgs) (reply bool, err error) {
return false, errors.New("unable to lock")
}
func (d *errorLocker) Unlock(args dsync.LockArgs) (reply bool, err error) {
return false, errors.New("unable to unlock")
}
func (d *errorLocker) RLock(args dsync.LockArgs) (reply bool, err error) {
return false, errors.New("unable to rlock")
}
func (d *errorLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) {
return false, errors.New("unable to runlock")
}
func (d *errorLocker) Close() error {
return nil
}
func (d *errorLocker) IsOnline() bool {
return false
}
func (d *errorLocker) Expired(args dsync.LockArgs) (reply bool, err error) {
return false, errors.New("unable to check for lock expiration")
}
// localLocker implements Dsync.NetLocker
type localLocker struct {
mutex sync.Mutex

@ -76,8 +76,8 @@ func (client *lockRESTClient) call(method string, values url.Values, body io.Rea
}
if isNetworkError(err) {
time.AfterFunc(defaultRetryUnit*3, func() {
// After 3 seconds, take this lock client online for a retry.
time.AfterFunc(defaultRetryUnit, func() {
// After 1 seconds, take this lock client online for a retry.
atomic.StoreInt32(&client.connected, 1)
})

@ -238,12 +238,6 @@ func testPutObjectPartDiskNotFound(obj ObjectLayer, instanceType string, disks [
t.Fatalf("Test %s: expected to fail but passed instead", instanceType)
}
// as majority of xl.json are not available, we expect uploadID to be not available.
expectedErr1 := BucketNotFound{Bucket: testCase.bucketName}
if err.Error() != expectedErr1.Error() {
t.Fatalf("Test %s: expected error %s, got %s instead.", instanceType, expectedErr1, err)
}
// This causes invalid upload id.
for _, disk := range disks {
os.RemoveAll(disk)

@ -563,7 +563,7 @@ func (s *peerRESTServer) ReloadFormatHandler(w http.ResponseWriter, r *http.Requ
vars := mux.Vars(r)
dryRunString := vars[peerRESTDryRun]
if dryRunString == "" {
s.writeErrorResponse(w, errors.New("dry run parameter is missing"))
s.writeErrorResponse(w, errors.New("dry-run parameter is missing"))
return
}
@ -583,6 +583,7 @@ func (s *peerRESTServer) ReloadFormatHandler(w http.ResponseWriter, r *http.Requ
s.writeErrorResponse(w, errServerNotInitialized)
return
}
err := objAPI.ReloadFormat(context.Background(), dryRun)
if err != nil {
s.writeErrorResponse(w, err)

@ -554,9 +554,11 @@ func (s *posix) SetDiskID(id string) {
func (s *posix) MakeVolBulk(volumes ...string) (err error) {
for _, volume := range volumes {
if err = s.MakeVol(volume); err != nil {
if err != errVolumeExists {
return err
}
}
}
return nil
}

@ -200,9 +200,6 @@ func initSafeMode(buckets []BucketInfo) (err error) {
}
}(objLock)
// Calls New() and initializes all sub-systems.
newAllSubsystems()
// Migrate all backend configs to encrypted backend configs, optionally
// handles rotating keys for encryption.
if err = handleEncryptedConfigBackend(newObject, true); err != nil {
@ -365,6 +362,16 @@ func serverMain(ctx *cli.Context) {
globalObjectAPI = newObject
globalObjLayerMutex.Unlock()
// Calls New() and initializes all sub-systems.
newAllSubsystems()
// Enable healing to heal drives if possible
if globalIsXL {
initBackgroundHealing()
initLocalDisksAutoHeal()
initGlobalHeal()
}
buckets, err := newObject.ListBuckets(context.Background())
if err != nil {
logger.Fatal(err, "Unable to list buckets")
@ -391,12 +398,6 @@ func serverMain(ctx *cli.Context) {
initDataUsageStats()
initDailyLifecycle()
if globalIsXL {
initBackgroundHealing()
initLocalDisksAutoHeal()
initGlobalHeal()
}
// Disable safe mode operation, after all initialization is over.
globalObjLayerMutex.Lock()
globalSafeMode = false

@ -145,9 +145,10 @@ const (
// (Acceptable values range from 1 to 10000 inclusive)
globalMaxPartID = 10000
// Default values used while communicating with the cloud backends
defaultDialTimeout = 30 * time.Second
defaultDialKeepAlive = 30 * time.Second
// Default values used while communicating for
// internode communication.
defaultDialTimeout = 15 * time.Second
defaultDialKeepAlive = 20 * time.Second
)
// isMaxObjectSize - verify if max object size
@ -350,7 +351,6 @@ func newCustomDialContext(dialTimeout, dialKeepAlive time.Duration) dialContext
dialer := &net.Dialer{
Timeout: dialTimeout,
KeepAlive: dialKeepAlive,
DualStack: true,
}
return dialer.DialContext(ctx, network, addr)
@ -363,10 +363,12 @@ func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout, dialKeepAlive ti
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: newCustomDialContext(dialTimeout, dialKeepAlive),
MaxIdleConns: 256,
MaxIdleConnsPerHost: 256,
IdleConnTimeout: 60 * time.Second,
TLSHandshakeTimeout: 30 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
ExpectContinueTimeout: 3 * time.Second,
TLSClientConfig: tlsConfig,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested

@ -74,9 +74,6 @@ type xlSets struct {
// Distributed locker clients.
xlLockers setsDsyncLockers
// Lockers map holds dsync lockers for each endpoint
lockersMap map[Endpoint]dsync.NetLocker
// List of endpoints provided on the command line.
endpoints Endpoints
@ -112,9 +109,6 @@ func (s *xlSets) isConnected(endpoint Endpoint) bool {
if s.xlDisks[i][j].String() != endpointStr {
continue
}
if !s.xlLockers[i][j].IsOnline() {
continue
}
return s.xlDisks[i][j].IsOnline()
}
}
@ -161,9 +155,9 @@ func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) {
return -1, -1, fmt.Errorf("diskID: %s not found", format.XL.This)
}
// connectDisksAndLockersWithQuorum is same as connectDisksAndLockers but waits
// connectDisksWithQuorum is same as connectDisks but waits
// for quorum number of formatted disks to be online in any given sets.
func (s *xlSets) connectDisksAndLockersWithQuorum() {
func (s *xlSets) connectDisksWithQuorum() {
var onlineDisks int
for onlineDisks < len(s.endpoints)/2 {
for _, endpoint := range s.endpoints {
@ -184,7 +178,6 @@ func (s *xlSets) connectDisksAndLockersWithQuorum() {
}
disk.SetDiskID(format.XL.This)
s.xlDisks[i][j] = disk
s.xlLockers[i][j] = s.lockersMap[endpoint]
onlineDisks++
}
// Sleep for a while - so that we don't go into
@ -193,9 +186,9 @@ func (s *xlSets) connectDisksAndLockersWithQuorum() {
}
}
// connectDisksAndLockers - attempt to connect all the endpoints, loads format
// connectDisks - attempt to connect all the endpoints, loads format
// and re-arranges the disks in proper position.
func (s *xlSets) connectDisksAndLockers() {
func (s *xlSets) connectDisks() {
for _, endpoint := range s.endpoints {
if s.isConnected(endpoint) {
continue
@ -215,7 +208,6 @@ func (s *xlSets) connectDisksAndLockers() {
disk.SetDiskID(format.XL.This)
s.xlDisksMu.Lock()
s.xlDisks[i][j] = disk
s.xlLockers[i][j] = s.lockersMap[endpoint]
s.xlDisksMu.Unlock()
}
}
@ -235,7 +227,7 @@ func (s *xlSets) monitorAndConnectEndpoints(monitorInterval time.Duration) {
case <-s.disksConnectDoneCh:
return
case <-ticker.C:
s.connectDisksAndLockers()
s.connectDisks()
}
}
}
@ -246,12 +238,6 @@ func (s *xlSets) GetLockers(setIndex int) func() []dsync.NetLocker {
defer s.xlDisksMu.Unlock()
lockers := make([]dsync.NetLocker, s.drivesPerSet)
copy(lockers, s.xlLockers[setIndex])
for i, lk := range lockers {
// Add error lockers for unavailable locker.
if lk == nil {
lockers[i] = &errorLocker{}
}
}
return lockers
}
}
@ -271,17 +257,11 @@ const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs
// Initialize new set of erasure coded sets.
func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerSet int) (*xlSets, error) {
lockersMap := make(map[Endpoint]dsync.NetLocker)
for _, endpoint := range endpoints {
lockersMap[endpoint] = newLockAPI(endpoint)
}
// Initialize the XL sets instance.
s := &xlSets{
sets: make([]*xlObjects, setCount),
xlDisks: make([][]StorageAPI, setCount),
xlLockers: make([][]dsync.NetLocker, setCount),
lockersMap: lockersMap,
endpoints: endpoints,
setCount: setCount,
drivesPerSet: drivesPerSet,
@ -308,12 +288,20 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS
nsMutex: mutex,
bp: bp,
}
go s.sets[i].cleanupStaleMultipartUploads(context.Background(),
GlobalMultipartCleanupInterval, GlobalMultipartExpiry, GlobalServiceDoneCh)
}
// Rely on endpoints list to initialize, init lockers.
for i := 0; i < s.setCount; i++ {
for j := 0; j < s.drivesPerSet; j++ {
s.xlLockers[i][j] = newLockAPI(s.endpoints[i*s.drivesPerSet+j])
}
}
// Connect disks right away, but wait until we have `format.json` quorum.
s.connectDisksAndLockersWithQuorum()
s.connectDisksWithQuorum()
// Start the disk monitoring and connect routine.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
@ -1312,9 +1300,9 @@ func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) {
// Replace the new format.
s.format = refFormat
// Close all existing disks, lockers and reconnect all the disks/lockers.
// Close all existing disks and reconnect all the disks.
s.xlDisks.Close()
s.connectDisksAndLockers()
s.connectDisks()
// Restart monitoring loop to monitor reformatted disks again.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
@ -1435,11 +1423,6 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
return res, errNoHealRequired
}
// All disks are unformatted, return quorum error.
if shouldInitXLDisks(sErrs) {
return res, errXLReadQuorum
}
refFormat, err := getFormatXLInQuorum(formats)
if err != nil {
return res, err
@ -1506,7 +1489,7 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
// Disconnect/relinquish all existing disks, lockers and reconnect the disks, lockers.
s.xlDisks.Close()
s.connectDisksAndLockers()
s.connectDisks()
// Restart our monitoring loop to start monitoring newly formatted disks.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
@ -1668,9 +1651,6 @@ func (s *xlSets) IsReady(_ context.Context) bool {
if s.xlDisks[i][j] == nil {
continue
}
if !s.xlLockers[i][j].IsOnline() {
continue
}
if s.xlDisks[i][j].IsOnline() {
activeDisks++
}

@ -123,7 +123,6 @@ func getDisksInfo(disks []StorageAPI) (disksInfo []DiskInfo, onlineDisks, offlin
// Wait for the routines.
for i, err := range g.Wait() {
peerAddr, pErr := getPeerAddress(disksInfo[i].RelativePath)
if pErr != nil {
continue
}
@ -144,39 +143,20 @@ func getDisksInfo(disks []StorageAPI) (disksInfo []DiskInfo, onlineDisks, offlin
return disksInfo, onlineDisks, offlineDisks
}
// returns sorted disksInfo slice which has only valid entries.
// i.e the entries where the total size of the disk is not stated
// as 0Bytes, this means that the disk is not online or ignored.
func sortValidDisksInfo(disksInfo []DiskInfo) []DiskInfo {
var validDisksInfo []DiskInfo
for _, diskInfo := range disksInfo {
if diskInfo.Total == 0 {
continue
}
validDisksInfo = append(validDisksInfo, diskInfo)
}
sort.Sort(byDiskTotal(validDisksInfo))
return validDisksInfo
}
// Get an aggregated storage info across all disks.
func getStorageInfo(disks []StorageAPI) StorageInfo {
disksInfo, onlineDisks, offlineDisks := getDisksInfo(disks)
// Sort so that the first element is the smallest.
validDisksInfo := sortValidDisksInfo(disksInfo)
// If there are no valid disks, set total and free disks to 0
if len(validDisksInfo) == 0 {
return StorageInfo{}
}
sort.Sort(byDiskTotal(disksInfo))
// Combine all disks to get total usage
usedList := make([]uint64, len(validDisksInfo))
totalList := make([]uint64, len(validDisksInfo))
availableList := make([]uint64, len(validDisksInfo))
mountPaths := make([]string, len(validDisksInfo))
usedList := make([]uint64, len(disksInfo))
totalList := make([]uint64, len(disksInfo))
availableList := make([]uint64, len(disksInfo))
mountPaths := make([]string, len(disksInfo))
for i, di := range validDisksInfo {
for i, di := range disksInfo {
usedList[i] = di.Used
totalList[i] = di.Total
availableList[i] = di.Free

@ -1,67 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2016 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"reflect"
"testing"
)
// Sort valid disks info.
func TestSortingValidDisks(t *testing.T) {
testCases := []struct {
disksInfo []DiskInfo
validDisksInfo []DiskInfo
}{
// One of the disks is offline.
{
disksInfo: []DiskInfo{
{Total: 150, Free: 10},
{Total: 0, Free: 0},
{Total: 200, Free: 10},
{Total: 100, Free: 10},
},
validDisksInfo: []DiskInfo{
{Total: 100, Free: 10},
{Total: 150, Free: 10},
{Total: 200, Free: 10},
},
},
// All disks are online.
{
disksInfo: []DiskInfo{
{Total: 150, Free: 10},
{Total: 200, Free: 10},
{Total: 100, Free: 10},
{Total: 115, Free: 10},
},
validDisksInfo: []DiskInfo{
{Total: 100, Free: 10},
{Total: 115, Free: 10},
{Total: 150, Free: 10},
{Total: 200, Free: 10},
},
},
}
for i, testCase := range testCases {
validDisksInfo := sortValidDisksInfo(testCase.disksInfo)
if !reflect.DeepEqual(validDisksInfo, testCase.validDisksInfo) {
t.Errorf("Test %d: Expected %#v, Got %#v", i+1, testCase.validDisksInfo, validDisksInfo)
}
}
}

@ -18,6 +18,7 @@ package dsync
import (
"context"
"errors"
"fmt"
golog "log"
"math"
@ -191,6 +192,12 @@ func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bo
go func(index int, isReadLock bool, c NetLocker) {
defer wg.Done()
g := Granted{index: index}
if c == nil {
ch <- g
return
}
args := LockArgs{
UID: id,
Resource: lockName,
@ -209,7 +216,6 @@ func lock(ds *Dsync, locks *[]string, lockName, id, source string, isReadLock bo
}
}
g := Granted{index: index}
if locked {
g.lockUID = args.UID
}
@ -400,6 +406,11 @@ func unlock(ds *Dsync, locks []string, name string, isReadLock bool, restClnts [
// sendRelease sends a release message to a node that previously granted a lock
func sendRelease(ds *Dsync, c NetLocker, name, uid string, isReadLock bool) {
if c == nil {
log("Unable to call RUnlock", errors.New("netLocker is offline"))
return
}
args := LockArgs{
UID: uid,
Resource: name,

Loading…
Cancel
Save