fix: protect updating latencies/throughput slices in obd (#10611)

Additionally close the transferChan upon function exit.
master
Harshavardhana 4 years ago committed by GitHub
parent 3047121255
commit 98a08e1644
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      cmd/peer-rest-client.go
  2. 7
      pkg/disk/obd.go

@ -134,6 +134,7 @@ func (r *nullReader) Read(b []byte) (int, error) {
} }
func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64, threadCount uint) (info madmin.NetOBDInfo, err error) { func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64, threadCount uint) (info madmin.NetOBDInfo, err error) {
var mu sync.Mutex // mutex used to protect these slices in go-routines
latencies := []float64{} latencies := []float64{}
throughputs := []float64{} throughputs := []float64{}
@ -142,6 +143,8 @@ func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64,
totalTransferred := int64(0) totalTransferred := int64(0)
transferChan := make(chan int64, threadCount) transferChan := make(chan int64, threadCount)
defer close(transferChan)
go func() { go func() {
for v := range transferChan { for v := range transferChan {
atomic.AddInt64(&totalTransferred, v) atomic.AddInt64(&totalTransferred, v)
@ -226,13 +229,16 @@ func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64,
/* Throughput = (total data transferred across all threads / time taken) */ /* Throughput = (total data transferred across all threads / time taken) */
throughput := float64((after - before)) / latency throughput := float64((after - before)) / latency
// Protect updating latencies and throughputs slices from
// multiple go-routines.
mu.Lock()
latencies = append(latencies, latency) latencies = append(latencies, latency)
throughputs = append(throughputs, throughput) throughputs = append(throughputs, throughput)
mu.Unlock()
}(i) }(i)
} }
} }
wg.Wait() wg.Wait()
close(transferChan)
if err != nil { if err != nil {
return info, err return info, err

@ -27,9 +27,6 @@ import (
"github.com/montanaflynn/stats" "github.com/montanaflynn/stats"
) )
var globalLatency = map[string]Latency{}
var globalThroughput = map[string]Throughput{}
// Latency holds latency information for write operations to the drive // Latency holds latency information for write operations to the drive
type Latency struct { type Latency struct {
Avg float64 `json:"avg_secs,omitempty"` Avg float64 `json:"avg_secs,omitempty"`
@ -154,6 +151,7 @@ func GetOBDInfo(ctx context.Context, drive, fsPath string) (Latency, Throughput,
if minThroughput, err = stats.Min(throughputs); err != nil { if minThroughput, err = stats.Min(throughputs); err != nil {
return Latency{}, Throughput{}, err return Latency{}, Throughput{}, err
} }
t := Throughput{ t := Throughput{
Avg: avgThroughput, Avg: avgThroughput,
Percentile50: percentile50Throughput, Percentile50: percentile50Throughput,
@ -163,8 +161,5 @@ func GetOBDInfo(ctx context.Context, drive, fsPath string) (Latency, Throughput,
Max: maxThroughput, Max: maxThroughput,
} }
globalLatency[drive] = l
globalThroughput[drive] = t
return l, t, nil return l, t, nil
} }

Loading…
Cancel
Save