diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index ca2c465f5..f7c7c6eef 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -23,7 +23,6 @@ import ( "encoding/gob" "errors" "io" - "io/ioutil" "math" "net/url" "strconv" @@ -122,19 +121,22 @@ type progressReader struct { func (p *progressReader) Read(b []byte) (int, error) { n, err := p.r.Read(b) - if err != nil && err != io.EOF { - return n, err + if n >= 0 { + p.progressChan <- int64(n) } - p.progressChan <- int64(n) return n, err } +type nullReader struct{} + +func (r *nullReader) Read(b []byte) (int, error) { + return len(b), nil +} + func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64, threadCount uint) (info madmin.NetOBDInfo, err error) { latencies := []float64{} throughputs := []float64{} - buf := make([]byte, dataSize) - buflimiter := make(chan struct{}, threadCount) errChan := make(chan error, threadCount) @@ -163,7 +165,7 @@ func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64, } } - wg := sync.WaitGroup{} + var wg sync.WaitGroup finish := func() { <-buflimiter wg.Done() @@ -183,27 +185,26 @@ func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64, } go func(i int) { - bufReader := bytes.NewReader(buf) - bufReadCloser := ioutil.NopCloser(&progressReader{ - r: bufReader, + progress := &progressReader{ + r: io.LimitReader(&nullReader{}, dataSize), progressChan: transferChan, - }) + } start := time.Now() before := atomic.LoadInt64(&totalTransferred) ctx, cancel := context.WithTimeout(innerCtx, 10*time.Second) defer cancel() - respBody, err := client.callWithContext(ctx, peerRESTMethodNetOBDInfo, nil, bufReadCloser, dataSize) - if err != nil { - if netErr, ok := err.(*rest.NetworkError); ok { - if urlErr, ok := netErr.Err.(*url.Error); ok { - if urlErr.Err.Error() == context.DeadlineExceeded.Error() { - slowSample() - finish() - return - } - } + // Turn off healthCheckFn for OBD tests to cater for higher load on the peers. + clnt := newPeerRESTClient(client.host) + clnt.restClient.HealthCheckFn = nil + + respBody, err := clnt.callWithContext(ctx, peerRESTMethodNetOBDInfo, nil, progress, dataSize) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + slowSample() + finish() + return } errChan <- err @@ -329,15 +330,8 @@ func (client *peerRESTClient) NetOBDInfo(ctx context.Context) (info madmin.NetOB continue } - if netErr, ok := err.(*rest.NetworkError); ok { - if urlErr, ok := netErr.Err.(*url.Error); ok { - if urlErr.Err.Error() == context.Canceled.Error() { - continue - } - if urlErr.Err.Error() == context.DeadlineExceeded.Error() { - continue - } - } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + continue } } return info, err diff --git a/pkg/disk/obd.go b/pkg/disk/obd.go index 4580967b0..b96d53efb 100644 --- a/pkg/disk/obd.go +++ b/pkg/disk/obd.go @@ -65,13 +65,6 @@ func GetOBDInfo(ctx context.Context, drive, fsPath string) (Latency, Throughput, os.Remove(fsPath) }() - // going to leave this here incase we decide to go back to caching again - // if gl, ok := globalLatency[drive]; ok { - // if gt, ok := globalThroughput[drive]; ok { - // return gl, gt, nil - // } - // } - blockSize := 4 * humanize.MiByte fileSize := 256 * humanize.MiByte @@ -94,6 +87,9 @@ func GetOBDInfo(ctx context.Context, drive, fsPath string) (Latency, Throughput, latencies[i] = float64(latencyInSecs) } + // Sync every full writes fdatasync + Fdatasync(w) + for i := range latencies { throughput := float64(blockSize) / latencies[i] throughputs[i] = throughput