xl: Implement MRF healing (#8470)

master
Anis Elleuch 5 years ago committed by Harshavardhana
parent 64fde1ab95
commit 935546d5ca
  1. 13
      cmd/admin-handlers_test.go
  2. 27
      cmd/test-utils_test.go
  3. 106
      cmd/xl-sets.go
  4. 15
      cmd/xl-v1-multipart.go
  5. 23
      cmd/xl-v1-object.go
  6. 10
      cmd/xl-v1.go

@ -47,6 +47,10 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) {
// reset global variables to start afresh. // reset global variables to start afresh.
resetTestGlobals() resetTestGlobals()
// Set globalIsXL to indicate that the setup uses an erasure
// code backend.
globalIsXL = true
// Initializing objectLayer for HealFormatHandler. // Initializing objectLayer for HealFormatHandler.
objLayer, xlDirs, xlErr := initTestXLObjLayer() objLayer, xlDirs, xlErr := initTestXLObjLayer()
if xlErr != nil { if xlErr != nil {
@ -63,15 +67,6 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) {
globalEndpoints = mustGetZoneEndpoints(xlDirs...) globalEndpoints = mustGetZoneEndpoints(xlDirs...)
// Set globalIsXL to indicate that the setup uses an erasure
// code backend.
globalIsXL = true
// Init global heal state
if globalIsXL {
globalAllHealState = initHealState()
}
globalConfigSys = NewConfigSys() globalConfigSys = NewConfigSys()
globalIAMSys = NewIAMSys() globalIAMSys = NewIAMSys()

@ -459,15 +459,30 @@ func resetGlobalIsXL() {
// reset global heal state // reset global heal state
func resetGlobalHealState() { func resetGlobalHealState() {
// Init global heal state
if globalAllHealState == nil { if globalAllHealState == nil {
return globalAllHealState = initHealState()
} else {
globalAllHealState.Lock()
for _, v := range globalAllHealState.healSeqMap {
if !v.hasEnded() {
v.stop()
}
}
globalAllHealState.Unlock()
} }
globalAllHealState.Lock()
defer globalAllHealState.Unlock() // Init background heal state
for _, v := range globalAllHealState.healSeqMap { if globalBackgroundHealState == nil {
if !v.hasEnded() { globalBackgroundHealState = initHealState()
v.stop() } else {
globalBackgroundHealState.Lock()
for _, v := range globalBackgroundHealState.healSeqMap {
if !v.hasEnded() {
v.stop()
}
} }
globalBackgroundHealState.Unlock()
} }
} }

@ -56,6 +56,11 @@ func (s setsStorageAPI) Close() error {
return nil return nil
} }
// Information of a new disk connection
type diskConnectInfo struct {
setIndex int
}
// xlSets implements ObjectLayer combining a static list of erasure coded // xlSets implements ObjectLayer combining a static list of erasure coded
// object sets. NOTE: There is no dynamic scaling allowed or intended in // object sets. NOTE: There is no dynamic scaling allowed or intended in
// current design. // current design.
@ -80,6 +85,8 @@ type xlSets struct {
// Total number of sets and the number of disks per set. // Total number of sets and the number of disks per set.
setCount, drivesPerSet int setCount, drivesPerSet int
disksConnectEvent chan diskConnectInfo
// Done channel to control monitoring loop. // Done channel to control monitoring loop.
disksConnectDoneCh chan struct{} disksConnectDoneCh chan struct{}
@ -88,6 +95,9 @@ type xlSets struct {
// Merge tree walk // Merge tree walk
pool *MergeWalkPool pool *MergeWalkPool
mrfMU sync.Mutex
mrfUploads map[string]int
} }
// isConnected - checks if the endpoint is connected or not. // isConnected - checks if the endpoint is connected or not.
@ -135,6 +145,8 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatXLV3, error) {
// findDiskIndex - returns the i,j'th position of the input `format` against the reference // findDiskIndex - returns the i,j'th position of the input `format` against the reference
// format, after successful validation. // format, after successful validation.
// - i'th position is the set index
// - j'th position is the disk index in the current set
func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) { func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) {
if err := formatXLV3Check(refFormat, format); err != nil { if err := formatXLV3Check(refFormat, format); err != nil {
return 0, 0, err return 0, 0, err
@ -198,7 +210,7 @@ func (s *xlSets) connectDisks() {
printEndpointError(endpoint, err) printEndpointError(endpoint, err)
continue continue
} }
i, j, err := findDiskIndex(s.format, format) setIndex, diskIndex, err := findDiskIndex(s.format, format)
if err != nil { if err != nil {
// Close the internal connection to avoid connection leaks. // Close the internal connection to avoid connection leaks.
disk.Close() disk.Close()
@ -207,8 +219,14 @@ func (s *xlSets) connectDisks() {
} }
disk.SetDiskID(format.XL.This) disk.SetDiskID(format.XL.This)
s.xlDisksMu.Lock() s.xlDisksMu.Lock()
s.xlDisks[i][j] = disk s.xlDisks[setIndex][diskIndex] = disk
s.xlDisksMu.Unlock() s.xlDisksMu.Unlock()
// Send a new disk connect event with a timeout
select {
case s.disksConnectEvent <- diskConnectInfo{setIndex: setIndex}:
case <-time.After(100 * time.Millisecond):
}
} }
} }
@ -216,6 +234,7 @@ func (s *xlSets) connectDisks() {
// endpoints by reconnecting them and making sure to place them into right position in // endpoints by reconnecting them and making sure to place them into right position in
// the set topology, this monitoring happens at a given monitoring interval. // the set topology, this monitoring happens at a given monitoring interval.
func (s *xlSets) monitorAndConnectEndpoints(monitorInterval time.Duration) { func (s *xlSets) monitorAndConnectEndpoints(monitorInterval time.Duration) {
ticker := time.NewTicker(monitorInterval) ticker := time.NewTicker(monitorInterval)
// Stop the timer. // Stop the timer.
defer ticker.Stop() defer ticker.Stop()
@ -264,9 +283,11 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS
setCount: setCount, setCount: setCount,
drivesPerSet: drivesPerSet, drivesPerSet: drivesPerSet,
format: format, format: format,
disksConnectEvent: make(chan diskConnectInfo),
disksConnectDoneCh: make(chan struct{}), disksConnectDoneCh: make(chan struct{}),
distributionAlgo: format.XL.DistributionAlgo, distributionAlgo: format.XL.DistributionAlgo,
pool: NewMergeWalkPool(globalMergeLookupTimeout), pool: NewMergeWalkPool(globalMergeLookupTimeout),
mrfUploads: make(map[string]int),
} }
mutex := newNSLock(globalIsDistXL) mutex := newNSLock(globalIsDistXL)
@ -281,10 +302,11 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS
// Initialize xl objects for a given set. // Initialize xl objects for a given set.
s.sets[i] = &xlObjects{ s.sets[i] = &xlObjects{
getDisks: s.GetDisks(i), getDisks: s.GetDisks(i),
getLockers: s.GetLockers(i), getLockers: s.GetLockers(i),
nsMutex: mutex, nsMutex: mutex,
bp: bp, bp: bp,
mrfUploadCh: make(chan partialUpload, 10000),
} }
go s.sets[i].cleanupStaleMultipartUploads(context.Background(), go s.sets[i].cleanupStaleMultipartUploads(context.Background(),
@ -304,6 +326,9 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS
// Start the disk monitoring and connect routine. // Start the disk monitoring and connect routine.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval) go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
go s.maintainMRFList()
go s.healMRFRoutine()
return s, nil return s, nil
} }
@ -1665,3 +1690,72 @@ func (s *xlSets) IsReady(_ context.Context) bool {
// Disks are not ready // Disks are not ready
return false return false
} }
// maintainMRFList gathers the list of successful partial uploads
// from all underlying xl sets and puts them in a global map which
// should not have more than 10000 entries.
func (s *xlSets) maintainMRFList() {
var agg = make(chan partialUpload, 10000)
for i, xl := range s.sets {
go func(c <-chan partialUpload, setIndex int) {
for msg := range c {
msg.failedSet = setIndex
select {
case agg <- msg:
default:
}
}
}(xl.mrfUploadCh, i)
}
for fUpload := range agg {
s.mrfMU.Lock()
if len(s.mrfUploads) > 10000 {
s.mrfMU.Unlock()
continue
}
s.mrfUploads[pathJoin(fUpload.bucket, fUpload.object)] = fUpload.failedSet
s.mrfMU.Unlock()
}
}
// healMRFRoutine monitors new disks connection, sweep the MRF list
// to find objects related to the new disk that needs to be healed.
func (s *xlSets) healMRFRoutine() {
// Wait until background heal state is initialized
var bgSeq *healSequence
for {
var ok bool
bgSeq, ok = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if ok {
break
}
time.Sleep(time.Second)
}
for e := range s.disksConnectEvent {
// Get the list of objects related the xl set
// to which the connected disk belongs.
var mrfUploads []string
s.mrfMU.Lock()
for k, v := range s.mrfUploads {
if v == e.setIndex {
mrfUploads = append(mrfUploads, k)
}
}
s.mrfMU.Unlock()
// Heal objects
for _, u := range mrfUploads {
// Send an object to be healed with a timeout
select {
case bgSeq.sourceCh <- u:
case <-time.After(100 * time.Millisecond):
}
s.mrfMU.Lock()
delete(s.mrfUploads, u)
s.mrfMU.Unlock()
}
}
}

@ -584,8 +584,10 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID) uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID)
storageDisks := xl.getDisks()
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), minioMetaMultipartBucket, uploadIDPath) partsMetadata, errs := readAllXLMetadata(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath)
// get Quorum for this object // get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(ctx, xl, partsMetadata, errs) _, writeQuorum, err := objectQuorumFromMeta(ctx, xl, partsMetadata, errs)
@ -598,7 +600,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
return oi, toObjectErr(reducedErr, bucket, object) return oi, toObjectErr(reducedErr, bucket, object)
} }
onlineDisks, modTime := listOnlineDisks(xl.getDisks(), partsMetadata, errs) onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Calculate full object size. // Calculate full object size.
var objectSize int64 var objectSize int64
@ -743,10 +745,17 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
} }
// Rename the multipart object to final location. // Rename the multipart object to final location.
if _, err = rename(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, true, writeQuorum, nil); err != nil { if onlineDisks, err = rename(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, true, writeQuorum, nil); err != nil {
return oi, toObjectErr(err, bucket, object) return oi, toObjectErr(err, bucket, object)
} }
// Check if there is any offline disk and add it to the MRF list
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] == nil || storageDisks[i] == nil {
xl.addPartialUpload(bucket, object)
}
}
// Success, return object info. // Success, return object info.
return xlMeta.ToObjectInfo(bucket, object), nil return xlMeta.ToObjectInfo(bucket, object), nil
} }

@ -626,7 +626,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
// NOTE: Do not use online disks slice here: the reason is that existing object should be purged // NOTE: Do not use online disks slice here: the reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors. Also allow renaming the // regardless of `xl.json` status and rolled back in case of errors. Also allow renaming the
// existing object if it is not present in quorum disks so users can overwrite stale objects. // existing object if it is not present in quorum disks so users can overwrite stale objects.
_, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, true, writeQuorum, []error{errFileNotFound}) _, err = rename(ctx, storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, true, writeQuorum, []error{errFileNotFound})
if err != nil { if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
@ -646,11 +646,19 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
} }
// Rename the successfully written temporary object to final location. // Rename the successfully written temporary object to final location.
_, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, nil) if onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, nil); err != nil {
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
// Whether a disk was initially or becomes offline
// during this upload, send it to the MRF list.
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] == nil || storageDisks[i] == nil {
xl.addPartialUpload(bucket, object)
break
}
}
// Object info is the same in all disks, so we can pick the first meta // Object info is the same in all disks, so we can pick the first meta
// of the first disk // of the first disk
xlMeta = partsMetadata[0] xlMeta = partsMetadata[0]
@ -960,3 +968,12 @@ func (xl xlObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat
} }
return listObjectsV2Info, err return listObjectsV2Info, err
} }
// Send the successul but partial upload, however ignore
// if the channel is blocked by other items.
func (xl xlObjects) addPartialUpload(bucket, key string) {
select {
case xl.mrfUploadCh <- partialUpload{bucket: bucket, object: key}:
default:
}
}

@ -39,6 +39,14 @@ const (
// OfflineDisk represents an unavailable disk. // OfflineDisk represents an unavailable disk.
var OfflineDisk StorageAPI // zero value is nil var OfflineDisk StorageAPI // zero value is nil
// partialUpload is a successful upload of an object
// but not written in all disks (having quorum)
type partialUpload struct {
bucket string
object string
failedSet int
}
// xlObjects - Implements XL object layer. // xlObjects - Implements XL object layer.
type xlObjects struct { type xlObjects struct {
// getDisks returns list of storageAPIs. // getDisks returns list of storageAPIs.
@ -55,6 +63,8 @@ type xlObjects struct {
// TODO: ListObjects pool management, should be removed in future. // TODO: ListObjects pool management, should be removed in future.
listPool *TreeWalkPool listPool *TreeWalkPool
mrfUploadCh chan partialUpload
} }
// NewNSLock - initialize a new namespace RWLocker instance. // NewNSLock - initialize a new namespace RWLocker instance.

Loading…
Cancel
Save