Fix config subsystem to wait on quorum number of formatted disks (#6407)

master
Harshavardhana 6 years ago committed by Nitish Tiwari
parent 81b7e5c7a8
commit d0d015361c
  1. 6
      cmd/server-main.go
  2. 32
      cmd/xl-sets.go

@ -318,6 +318,9 @@ func serverMain(ctx *cli.Context) {
initFederatorBackend(newObject) initFederatorBackend(newObject)
} }
// Re-enable logging
logger.Disable = false
// Create a new config system. // Create a new config system.
globalConfigSys = NewConfigSys() globalConfigSys = NewConfigSys()
@ -336,9 +339,6 @@ func serverMain(ctx *cli.Context) {
logger.FatalIf(err, "Unable to initialize disk caching") logger.FatalIf(err, "Unable to initialize disk caching")
} }
// Re-enable logging
logger.Disable = false
// Create new policy system. // Create new policy system.
globalPolicySys = NewPolicySys() globalPolicySys = NewPolicySys()

@ -168,6 +168,34 @@ func (s *xlSets) reInitDisks(refFormat *formatXLV3, storageDisks []StorageAPI, f
return xlDisks return xlDisks
} }
// connectDisksWithQuorum is same as connectDisks but waits
// for quorum number of formatted disks to be online in
// any given sets.
func (s *xlSets) connectDisksWithQuorum() {
var onlineDisks int
for onlineDisks < (len(s.endpoints)/2)+1 {
for _, endpoint := range s.endpoints {
if s.isConnected(endpoint) {
continue
}
disk, format, err := connectEndpoint(endpoint)
if err != nil {
printEndpointError(endpoint, err)
continue
}
i, j, err := findDiskIndex(s.format, format)
if err != nil {
// Close the internal connection to avoid connection leaks.
disk.Close()
printEndpointError(endpoint, err)
continue
}
s.xlDisks[i][j] = disk
onlineDisks++
}
}
}
// connectDisks - attempt to connect all the endpoints, loads format // connectDisks - attempt to connect all the endpoints, loads format
// and re-arranges the disks in proper position. // and re-arranges the disks in proper position.
func (s *xlSets) connectDisks() { func (s *xlSets) connectDisks() {
@ -260,8 +288,8 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
go s.sets[i].cleanupStaleMultipartUploads(context.Background(), globalMultipartCleanupInterval, globalMultipartExpiry, globalServiceDoneCh) go s.sets[i].cleanupStaleMultipartUploads(context.Background(), globalMultipartCleanupInterval, globalMultipartExpiry, globalServiceDoneCh)
} }
// Connect disks right away. // Connect disks right away, but wait until we have `format.json` quorum.
s.connectDisks() s.connectDisksWithQuorum()
// Start the disk monitoring and connect routine. // Start the disk monitoring and connect routine.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval) go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)

Loading…
Cancel
Save