@ -23,7 +23,6 @@ import (
"encoding/gob"
"encoding/gob"
"errors"
"errors"
"io"
"io"
"io/ioutil"
"math"
"math"
"net/url"
"net/url"
"strconv"
"strconv"
@ -122,19 +121,22 @@ type progressReader struct {
func ( p * progressReader ) Read ( b [ ] byte ) ( int , error ) {
func ( p * progressReader ) Read ( b [ ] byte ) ( int , error ) {
n , err := p . r . Read ( b )
n , err := p . r . Read ( b )
if err != nil && err != io . EOF {
if n >= 0 {
return n , err
}
p . progressChan <- int64 ( n )
p . progressChan <- int64 ( n )
}
return n , err
return n , err
}
}
type nullReader struct { }
func ( r * nullReader ) Read ( b [ ] byte ) ( int , error ) {
return len ( b ) , nil
}
func ( client * peerRESTClient ) doNetOBDTest ( ctx context . Context , dataSize int64 , threadCount uint ) ( info madmin . NetOBDInfo , err error ) {
func ( client * peerRESTClient ) doNetOBDTest ( ctx context . Context , dataSize int64 , threadCount uint ) ( info madmin . NetOBDInfo , err error ) {
latencies := [ ] float64 { }
latencies := [ ] float64 { }
throughputs := [ ] float64 { }
throughputs := [ ] float64 { }
buf := make ( [ ] byte , dataSize )
buflimiter := make ( chan struct { } , threadCount )
buflimiter := make ( chan struct { } , threadCount )
errChan := make ( chan error , threadCount )
errChan := make ( chan error , threadCount )
@ -163,7 +165,7 @@ func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64,
}
}
}
}
wg := sync . WaitGroup { }
var wg sync . WaitGroup
finish := func ( ) {
finish := func ( ) {
<- buflimiter
<- buflimiter
wg . Done ( )
wg . Done ( )
@ -183,28 +185,27 @@ func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64,
}
}
go func ( i int ) {
go func ( i int ) {
bufReader := bytes . NewReader ( buf )
progress := & progressReader {
bufReadCloser := ioutil . NopCloser ( & progressReader {
r : io . LimitReader ( & nullReader { } , dataSize ) ,
r : bufReader ,
progressChan : transferChan ,
progressChan : transferChan ,
} )
}
start := time . Now ( )
start := time . Now ( )
before := atomic . LoadInt64 ( & totalTransferred )
before := atomic . LoadInt64 ( & totalTransferred )
ctx , cancel := context . WithTimeout ( innerCtx , 10 * time . Second )
ctx , cancel := context . WithTimeout ( innerCtx , 10 * time . Second )
defer cancel ( )
defer cancel ( )
respBody , err := client . callWithContext ( ctx , peerRESTMethodNetOBDInfo , nil , bufReadCloser , dataSize )
if err != nil {
if netErr , ok := err . ( * rest . NetworkError ) ; ok {
// Turn off healthCheckFn for OBD tests to cater for higher load on the peers.
if urlErr , ok := netErr . Err . ( * url . Error ) ; ok {
clnt := newPeerRESTClient ( client . host )
if urlErr . Err . Error ( ) == context . DeadlineExceeded . Error ( ) {
clnt . restClient . HealthCheckFn = nil
respBody , err := clnt . callWithContext ( ctx , peerRESTMethodNetOBDInfo , nil , progress , dataSize )
if err != nil {
if errors . Is ( err , context . DeadlineExceeded ) {
slowSample ( )
slowSample ( )
finish ( )
finish ( )
return
return
}
}
}
}
errChan <- err
errChan <- err
finish ( )
finish ( )
@ -329,16 +330,9 @@ func (client *peerRESTClient) NetOBDInfo(ctx context.Context) (info madmin.NetOB
continue
continue
}
}
if netErr , ok := err . ( * rest . NetworkError ) ; ok {
if errors . Is ( err , context . Canceled ) || errors . Is ( err , context . DeadlineExceeded ) {
if urlErr , ok := netErr . Err . ( * url . Error ) ; ok {
if urlErr . Err . Error ( ) == context . Canceled . Error ( ) {
continue
continue
}
}
if urlErr . Err . Error ( ) == context . DeadlineExceeded . Error ( ) {
continue
}
}
}
}
}
return info , err
return info , err
}
}