From 85a57d20210eb867b1c023e024cc4391d5ac5685 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 3 Apr 2018 23:58:48 -0500 Subject: [PATCH] Make sure to close the disk connections (#5752) Since we do not re-use storageDisks after moving the connections to object layer we should close them appropriately otherwise we have a lot of connection leaks and these can compound as the time goes by. This PR also refactors the initialization code to re-use storageDisks for given set of endpoints until we have confirmed a valid reference format. --- cmd/format-xl.go | 65 +++++++---------- cmd/format-xl_test.go | 12 +++- cmd/prepare-storage.go | 160 +++++++++++++++++++++++++++-------------- cmd/typed-errors.go | 4 ++ cmd/xl-sets.go | 52 +++++++------- cmd/xl-v1.go | 9 +-- 6 files changed, 171 insertions(+), 131 deletions(-) diff --git a/cmd/format-xl.go b/cmd/format-xl.go index 2df283da0..301a36245 100644 --- a/cmd/format-xl.go +++ b/cmd/format-xl.go @@ -315,27 +315,18 @@ func shouldInitXLDisks(errs []error) bool { } // loadFormatXLAll - load all format config from all input disks in parallel. -func loadFormatXLAll(endpoints EndpointList) ([]*formatXLV3, []error) { +func loadFormatXLAll(storageDisks []StorageAPI) ([]*formatXLV3, []error) { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} - bootstrapDisks := make([]StorageAPI, len(endpoints)) - for i, endpoint := range endpoints { - disk, err := newStorageAPI(endpoint) - if err != nil { - continue - } - bootstrapDisks[i] = disk - } - // Initialize list of errors. - var sErrs = make([]error, len(bootstrapDisks)) + var sErrs = make([]error, len(storageDisks)) // Initialize format configs. - var formats = make([]*formatXLV3, len(bootstrapDisks)) + var formats = make([]*formatXLV3, len(storageDisks)) // Load format from each disk in parallel - for index, disk := range bootstrapDisks { + for index, disk := range storageDisks { if disk == nil { sErrs[index] = errDiskNotFound continue @@ -344,10 +335,9 @@ func loadFormatXLAll(endpoints EndpointList) ([]*formatXLV3, []error) { // Launch go-routine per disk. go func(index int, disk StorageAPI) { defer wg.Done() + format, lErr := loadFormatXL(disk) if lErr != nil { - // close the internal connection, to avoid fd leaks. - disk.Close() sErrs[index] = lErr return } @@ -530,12 +520,7 @@ func formatXLV3Check(reference *formatXLV3, format *formatXLV3) error { } // saveFormatXLAll - populates `format.json` on disks in its order. -func saveFormatXLAll(endpoints EndpointList, formats []*formatXLV3) error { - storageDisks, err := initStorageDisks(endpoints) - if err != nil { - return err - } - +func saveFormatXLAll(storageDisks []StorageAPI, formats []*formatXLV3) error { var errs = make([]error, len(storageDisks)) var wg = &sync.WaitGroup{} @@ -556,17 +541,25 @@ func saveFormatXLAll(endpoints EndpointList, formats []*formatXLV3) error { // Wait for the routines to finish. wg.Wait() - writeQuorum := len(endpoints)/2 + 1 + writeQuorum := len(storageDisks)/2 + 1 return reduceWriteQuorumErrs(errs, nil, writeQuorum) } +// relinquishes the underlying connection for all storage disks. +func closeStorageDisks(storageDisks []StorageAPI) { + for _, disk := range storageDisks { + if disk == nil { + continue + } + disk.Close() + } +} + // Initialize storage disks based on input arguments. func initStorageDisks(endpoints EndpointList) ([]StorageAPI, error) { // Bootstrap disks. storageDisks := make([]StorageAPI, len(endpoints)) for index, endpoint := range endpoints { - // Intentionally ignore disk not found errors. XL is designed - // to handle these errors internally. storage, err := newStorageAPI(endpoint) if err != nil && err != errDiskNotFound { return nil, err @@ -598,12 +591,7 @@ func formatXLV3ThisEmpty(formats []*formatXLV3) bool { } // fixFormatXLV3 - fix format XL configuration on all disks. -func fixFormatXLV3(endpoints EndpointList, formats []*formatXLV3) error { - storageDisks, err := initStorageDisks(endpoints) - if err != nil { - return err - } - +func fixFormatXLV3(storageDisks []StorageAPI, endpoints EndpointList, formats []*formatXLV3) error { for i, format := range formats { if format == nil || !endpoints[i].IsLocal { continue @@ -617,7 +605,7 @@ func fixFormatXLV3(endpoints EndpointList, formats []*formatXLV3) error { } if format.XL.This == "" { formats[i].XL.This = format.XL.Sets[0][i] - if err = saveFormatXL(storageDisks[i], formats[i]); err != nil { + if err := saveFormatXL(storageDisks[i], formats[i]); err != nil { return err } } @@ -626,9 +614,9 @@ func fixFormatXLV3(endpoints EndpointList, formats []*formatXLV3) error { } // initFormatXL - save XL format configuration on all disks. -func initFormatXL(endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) { +func initFormatXL(storageDisks []StorageAPI, setCount, disksPerSet int) (format *formatXLV3, err error) { format = newFormatXLV3(setCount, disksPerSet) - formats := make([]*formatXLV3, len(endpoints)) + formats := make([]*formatXLV3, len(storageDisks)) for i := 0; i < setCount; i++ { for j := 0; j < disksPerSet; j++ { @@ -639,12 +627,12 @@ func initFormatXL(endpoints EndpointList, setCount, disksPerSet int) (format *fo } // Initialize meta volume, if volume already exists ignores it. - if err = initFormatXLMetaVolume(endpoints, formats); err != nil { + if err = initFormatXLMetaVolume(storageDisks, formats); err != nil { return format, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err) } // Save formats `format.json` across all disks. - if err = saveFormatXLAll(endpoints, formats); err != nil { + if err = saveFormatXLAll(storageDisks, formats); err != nil { return nil, err } @@ -675,12 +663,7 @@ func makeFormatXLMetaVolumes(disk StorageAPI) error { var initMetaVolIgnoredErrs = append(baseIgnoredErrs, errVolumeExists) // Initializes meta volume on all input storage disks. -func initFormatXLMetaVolume(endpoints EndpointList, formats []*formatXLV3) error { - storageDisks, err := initStorageDisks(endpoints) - if err != nil { - return err - } - +func initFormatXLMetaVolume(storageDisks []StorageAPI, formats []*formatXLV3) error { // This happens for the first time, but keep this here since this // is the only place where it can be made expensive optimizing all // other calls. Create minio meta volume, if it doesn't exist yet. diff --git a/cmd/format-xl_test.go b/cmd/format-xl_test.go index db3bfc4ff..3e0e832eb 100644 --- a/cmd/format-xl_test.go +++ b/cmd/format-xl_test.go @@ -87,6 +87,11 @@ func TestFixFormatV3(t *testing.T) { } endpoints := mustGetNewEndpointList(xlDirs...) + storageDisks, err := initStorageDisks(endpoints) + if err != nil { + t.Fatal(err) + } + format := newFormatXLV3(1, 8) formats := make([]*formatXLV3, 8) @@ -96,17 +101,18 @@ func TestFixFormatV3(t *testing.T) { formats[j] = &newFormat } - if err = initFormatXLMetaVolume(endpoints, formats); err != nil { + if err = initFormatXLMetaVolume(storageDisks, formats); err != nil { t.Fatal(err) } formats[1] = nil expThis := formats[2].XL.This formats[2].XL.This = "" - if err := fixFormatXLV3(endpoints, formats); err != nil { + if err := fixFormatXLV3(storageDisks, endpoints, formats); err != nil { t.Fatal(err) } - newFormats, errs := loadFormatXLAll(endpoints) + + newFormats, errs := loadFormatXLAll(storageDisks) for _, err := range errs { if err != nil && errors.Cause(err) != errUnformattedDisk { t.Fatal(err) diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 4941e8552..a91521d34 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -88,6 +88,95 @@ func formatXLCleanupTmpLocalEndpoints(endpoints EndpointList) error { return nil } +// validate reference format against list of XL formats. +func validateXLFormats(format *formatXLV3, formats []*formatXLV3, endpoints EndpointList, setCount, drivesPerSet int) error { + for i := range formats { + if formats[i] == nil { + continue + } + if err := formatXLV3Check(format, formats[i]); err != nil { + return fmt.Errorf("%s format error: %s", endpoints[i], err) + } + } + if len(format.XL.Sets) != setCount { + return fmt.Errorf("Current backend format is inconsistent with input args (%s), Expected set count %d, got %d", endpoints, len(format.XL.Sets), setCount) + } + if len(format.XL.Sets[0]) != drivesPerSet { + return fmt.Errorf("Current backend format is inconsistent with input args (%s), Expected drive count per set %d, got %d", endpoints, len(format.XL.Sets[0]), drivesPerSet) + } + return nil +} + +// Following error message is added to fix a regression in release +// RELEASE.2018-03-16T22-52-12Z after migrating v1 to v2 to v3. This +// migration failed to capture '.This' field properly which indicates +// the disk UUID association. Below error message is returned when +// we see this situation in format.json, for more info refer +// https://github.com/minio/minio/issues/5667 +var errXLV3ThisEmpty = fmt.Errorf("XL format version 3 has This field empty") + +// connect to list of endpoints and load all XL disk formats, validate the formats are correct +// and are in quorum, if no formats are found attempt to initialize all of them for the first +// time. additionally make sure to close all the disks used in this attempt. +func connectLoadInitFormats(firstDisk bool, endpoints EndpointList, setCount, drivesPerSet int) (*formatXLV3, error) { + storageDisks, err := initStorageDisks(endpoints) + if err != nil { + return nil, err + } + defer closeStorageDisks(storageDisks) + + // Attempt to load all `format.json` from all disks. + formatConfigs, sErrs := loadFormatXLAll(storageDisks) + + // Pre-emptively check if one of the formatted disks + // is invalid. This function returns success for the + // most part unless one of the formats is not consistent + // with expected XL format. For example if a user is + // trying to pool FS backend into an XL set. + if err = checkFormatXLValues(formatConfigs); err != nil { + return nil, err + } + + for i, sErr := range sErrs { + if _, ok := formatCriticalErrors[errors.Cause(sErr)]; ok { + return nil, fmt.Errorf("Disk %s: %s", endpoints[i], sErr) + } + } + + if shouldInitXLDisks(sErrs) { + if !firstDisk { + return nil, errNotFirstDisk + } + return initFormatXL(storageDisks, setCount, drivesPerSet) + } + + // Following function is added to fix a regressions which was introduced + // in release RELEASE.2018-03-16T22-52-12Z after migrating v1 to v2 to v3. + // This migration failed to capture '.This' field properly which indicates + // the disk UUID association. Below function is called to handle and fix + // this regression, for more info refer https://github.com/minio/minio/issues/5667 + if err = fixFormatXLV3(storageDisks, endpoints, formatConfigs); err != nil { + return nil, err + } + + // If any of the .This field is still empty, we return error. + if formatXLV3ThisEmpty(formatConfigs) { + return nil, errXLV3ThisEmpty + } + + format, err := getFormatXLInQuorum(formatConfigs) + if err != nil { + return nil, err + } + + // Validate all format configs with reference format. + if err = validateXLFormats(format, formatConfigs, endpoints, setCount, drivesPerSet); err != nil { + return nil, err + } + + return format, nil +} + // Format disks before initialization of object layer. func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) { if len(endpoints) == 0 || setCount == 0 || disksPerSet == 0 { @@ -121,65 +210,26 @@ func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerS for { select { case _ = <-retryTimerCh: - // Attempt to load all `format.json` from all disks. - formatConfigs, sErrs := loadFormatXLAll(endpoints) - - // Pre-emptively check if one of the formatted disks - // is invalid. This function returns success for the - // most part unless one of the formats is not consistent - // with expected XL format. For example if a user is - // trying to pool FS backend into an XL set. - if err = checkFormatXLValues(formatConfigs); err != nil { - return nil, err - } - - for i, sErr := range sErrs { - if _, ok := formatCriticalErrors[errors.Cause(sErr)]; ok { - return nil, fmt.Errorf("Disk %s: %s", endpoints[i], sErr) - } - } - - if shouldInitXLDisks(sErrs) { - if !firstDisk { + format, err := connectLoadInitFormats(firstDisk, endpoints, setCount, disksPerSet) + if err != nil { + switch err { + case errNotFirstDisk: + // Fresh setup, wait for first server to be up. console.Println("Waiting for the first server to format the disks.") continue + case errXLReadQuorum: + // no quorum available continue to wait for minimum number of servers. + console.Printf("Waiting for a minimum of %d disks to come online (elapsed %s)\n", len(endpoints)/2, getElapsedTime()) + continue + case errXLV3ThisEmpty: + // need to wait for this error to be healed, so continue. + continue + default: + // For all other unhandled errors we exit and fail. + return nil, err } - return initFormatXL(endpoints, setCount, disksPerSet) - } - - // Following function is added to fix a regressions which was introduced - // in release RELEASE.2018-03-16T22-52-12Z after migrating v1 to v2 to v3. - // This migration failed to capture '.This' field properly which indicates - // the disk UUID association. Below function is called to handle and fix - // this regression, for more info refer https://github.com/minio/minio/issues/5667 - if err = fixFormatXLV3(endpoints, formatConfigs); err != nil { - return nil, err - } - - // If any of the .This field is still empty we wait them to be fixed. - if formatXLV3ThisEmpty(formatConfigs) { - continue - } - - format, err = getFormatXLInQuorum(formatConfigs) - if err == nil { - for i := range formatConfigs { - if formatConfigs[i] == nil { - continue - } - if err = formatXLV3Check(format, formatConfigs[i]); err != nil { - return nil, fmt.Errorf("%s format error: %s", endpoints[i], err) - } - } - if len(format.XL.Sets) != globalXLSetCount { - return nil, fmt.Errorf("Current backend format is inconsistent with input args (%s), Expected set count %d, got %d", endpoints, len(format.XL.Sets), globalXLSetCount) - } - if len(format.XL.Sets[0]) != globalXLSetDriveCount { - return nil, fmt.Errorf("Current backend format is inconsistent with input args (%s), Expected drive count per set %d, got %d", endpoints, len(format.XL.Sets[0]), globalXLSetDriveCount) - } - return format, nil } - console.Printf("Waiting for a minimum of %d disks to come online (elapsed %s)\n", len(endpoints)/2, getElapsedTime()) + return format, nil case <-globalOSSignalCh: return nil, fmt.Errorf("Initializing data volumes gracefully stopped") } diff --git a/cmd/typed-errors.go b/cmd/typed-errors.go index 196682ebf..b2cb978ab 100644 --- a/cmd/typed-errors.go +++ b/cmd/typed-errors.go @@ -58,3 +58,7 @@ var errInvalidRange = errors.New("Invalid range") // errInvalidRangeSource - returned when given range value exceeds // the source object size. var errInvalidRangeSource = errors.New("Range specified exceeds source object size") + +// error returned by disks which are to be initialized are waiting for the +// first server to initialize them in distributed set to initialize them. +var errNotFirstDisk = errors.New("Not first disk") diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 0a7d71ed5..34bc2644b 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -101,7 +101,7 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatXLV3, error) { format, err := loadFormatXL(disk) if err != nil { - // close the internal connection, to avoid fd leaks. + // Close the internal connection to avoid connection leaks. disk.Close() return nil, nil, err } @@ -147,6 +147,8 @@ func (s *xlSets) connectDisks() { i, j, err := findDiskIndex(s.format, format) s.formatMu.RUnlock() if err != nil { + // Close the internal connection to avoid connection leaks. + disk.Close() printEndpointError(endpoint, err) continue } @@ -215,19 +217,8 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP go s.sets[i].cleanupStaleMultipartUploads(globalMultipartCleanupInterval, globalMultipartExpiry, globalServiceDoneCh) } - for _, endpoint := range endpoints { - disk, nformat, err := connectEndpoint(endpoint) - if err != nil { - errorIf(err, "Unable to connect to endpoint %s", endpoint) - continue - } - i, j, err := findDiskIndex(format, nformat) - if err != nil { - errorIf(err, "Unable to find the endpoint %s in reference format", endpoint) - continue - } - s.xlDisks[i][j] = disk - } + // Connect disks right away. + s.connectDisks() // Initialize and load bucket policies. var err error @@ -267,7 +258,19 @@ func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo { storageInfo.Backend.RRSCData = rrSCData storageInfo.Backend.RRSCParity = rrSCparity - formats, sErrs := loadFormatXLAll(s.endpoints) + storageInfo.Backend.Sets = make([][]madmin.DriveInfo, s.setCount) + for i := range storageInfo.Backend.Sets { + storageInfo.Backend.Sets[i] = make([]madmin.DriveInfo, s.drivesPerSet) + } + + storageDisks, err := initStorageDisks(s.endpoints) + if err != nil { + return storageInfo + } + defer closeStorageDisks(storageDisks) + + formats, sErrs := loadFormatXLAll(storageDisks) + drivesInfo := formatsToDrivesInfo(s.endpoints, formats, sErrs) refFormat, err := getFormatXLInQuorum(formats) if err != nil { @@ -276,11 +279,6 @@ func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo { return storageInfo } - storageInfo.Backend.Sets = make([][]madmin.DriveInfo, s.setCount) - for i := range storageInfo.Backend.Sets { - storageInfo.Backend.Sets[i] = make([]madmin.DriveInfo, s.drivesPerSet) - } - // fill all the available/online endpoints for _, drive := range drivesInfo { if drive.UUID == "" { @@ -921,8 +919,14 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult } defer formatLock.Unlock() - formats, sErrs := loadFormatXLAll(s.endpoints) - if err := checkFormatXLValues(formats); err != nil { + storageDisks, err := initStorageDisks(s.endpoints) + if err != nil { + return madmin.HealResultItem{}, err + } + defer closeStorageDisks(storageDisks) + + formats, sErrs := loadFormatXLAll(storageDisks) + if err = checkFormatXLValues(formats); err != nil { return madmin.HealResultItem{}, err } @@ -1025,12 +1029,12 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult // Initialize meta volume, if volume already exists ignores it, all disks which // are not found are ignored as well. - if err = initFormatXLMetaVolume(s.endpoints, tmpNewFormats); err != nil { + if err = initFormatXLMetaVolume(storageDisks, tmpNewFormats); err != nil { return madmin.HealResultItem{}, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err) } // Save formats `format.json` across all disks. - if err = saveFormatXLAll(s.endpoints, tmpNewFormats); err != nil { + if err = saveFormatXLAll(storageDisks, tmpNewFormats); err != nil { return madmin.HealResultItem{}, err } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 9023f68b8..0f835fb06 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -59,14 +59,7 @@ var xlTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolu // Shutdown function for object storage interface. func (xl xlObjects) Shutdown(ctx context.Context) error { // Add any object layer shutdown activities here. - for _, disk := range xl.getDisks() { - // This closes storage rpc client connections if any. - // Otherwise this is a no-op. - if disk == nil { - continue - } - disk.Close() - } + closeStorageDisks(xl.getDisks()) return nil }