You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
minio/cmd/peer-rest-client.go

1026 lines
28 KiB

/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"context"
"crypto/tls"
"encoding/gob"
"io"
"io/ioutil"
"math"
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/dustin/go-humanize"
"github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
bucketsse "github.com/minio/minio/pkg/bucket/encryption"
"github.com/minio/minio/pkg/bucket/lifecycle"
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
trace "github.com/minio/minio/pkg/trace"
)
// client to talk to peer Nodes.
type peerRESTClient struct {
host *xnet.Host
restClient *rest.Client
connected int32
}
// Reconnect to a peer rest server.
func (client *peerRESTClient) reConnect() {
atomic.StoreInt32(&client.connected, 1)
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
// after verifying format.json
func (client *peerRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
return client.callWithContext(GlobalContext, method, values, body, length)
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
// after verifying format.json
func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
if !client.IsOnline() {
client.reConnect()
}
if values == nil {
values = make(url.Values)
}
respBody, err = client.restClient.CallWithContext(ctx, method, values, body, length)
if err == nil {
return respBody, nil
}
if isNetworkError(err) {
atomic.StoreInt32(&client.connected, 0)
}
return nil, err
}
// Stringer provides a canonicalized representation of node.
func (client *peerRESTClient) String() string {
return client.host.String()
}
// IsOnline - returns whether RPC client failed to connect or not.
func (client *peerRESTClient) IsOnline() bool {
return atomic.LoadInt32(&client.connected) == 1
}
// Close - marks the client as closed.
func (client *peerRESTClient) Close() error {
atomic.StoreInt32(&client.connected, 0)
client.restClient.Close()
return nil
}
// GetLocksResp stores various info from the client for each lock that is requested.
type GetLocksResp []map[string][]lockRequesterInfo
// GetLocks - fetch older locks for a remote node.
func (client *peerRESTClient) GetLocks() (locks GetLocksResp, err error) {
respBody, err := client.call(peerRESTMethodGetLocks, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&locks)
return locks, err
}
// ServerInfo - fetch server information for a remote node.
func (client *peerRESTClient) ServerInfo() (info madmin.ServerProperties, err error) {
respBody, err := client.call(peerRESTMethodServerInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
type networkOverloadedErr struct{}
var networkOverloaded networkOverloadedErr
func (n networkOverloadedErr) Error() string {
return "network overloaded"
}
type progressReader struct {
r io.Reader
progressChan chan int64
}
func (p *progressReader) Read(b []byte) (int, error) {
n, err := p.r.Read(b)
if err != nil && err != io.EOF {
return n, err
}
p.progressChan <- int64(n)
return n, err
}
func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64, threadCount uint) (info madmin.NetOBDInfo, err error) {
latencies := []float64{}
throughputs := []float64{}
buf := make([]byte, dataSize)
buflimiter := make(chan struct{}, threadCount)
errChan := make(chan error, threadCount)
totalTransferred := int64(0)
transferChan := make(chan int64, threadCount)
go func() {
for v := range transferChan {
atomic.AddInt64(&totalTransferred, v)
}
}()
// ensure enough samples to obtain normal distribution
maxSamples := int(10 * threadCount)
innerCtx, cancel := context.WithCancel(ctx)
slowSamples := int32(0)
maxSlowSamples := int32(maxSamples / 20)
slowSample := func() {
if slowSamples > maxSlowSamples { // 5% of total
return
}
if atomic.AddInt32(&slowSamples, 1) >= maxSlowSamples {
errChan <- networkOverloaded
cancel()
}
}
wg := sync.WaitGroup{}
finish := func() {
<-buflimiter
wg.Done()
}
for i := 0; i < maxSamples; i++ {
select {
case <-ctx.Done():
return info, ctx.Err()
case err = <-errChan:
case buflimiter <- struct{}{}:
wg.Add(1)
if innerCtx.Err() != nil {
finish()
continue
}
go func(i int) {
bufReader := bytes.NewReader(buf)
bufReadCloser := ioutil.NopCloser(&progressReader{
r: bufReader,
progressChan: transferChan,
})
start := time.Now()
before := atomic.LoadInt64(&totalTransferred)
ctx, cancel := context.WithTimeout(innerCtx, 10*time.Second)
defer cancel()
respBody, err := client.callWithContext(ctx, peerRESTMethodNetOBDInfo, nil, bufReadCloser, dataSize)
if err != nil {
if netErr, ok := err.(*rest.NetworkError); ok {
if urlErr, ok := netErr.Err.(*url.Error); ok {
if urlErr.Err.Error() == context.DeadlineExceeded.Error() {
slowSample()
finish()
return
}
}
}
errChan <- err
finish()
return
}
http.DrainBody(respBody)
after := atomic.LoadInt64(&totalTransferred)
finish()
end := time.Now()
latency := float64(end.Sub(start).Seconds())
if latency > maxLatencyForSizeThreads(dataSize, threadCount) {
slowSample()
}
/* Throughput = (total data transferred across all threads / time taken) */
throughput := float64(float64((after - before)) / latency)
latencies = append(latencies, latency)
throughputs = append(throughputs, throughput)
}(i)
}
}
wg.Wait()
if err != nil {
return info, err
}
latency, throughput, err := xnet.ComputeOBDStats(latencies, throughputs)
info = madmin.NetOBDInfo{
Latency: latency,
Throughput: throughput,
}
return info, err
}
func maxLatencyForSizeThreads(size int64, threadCount uint) float64 {
Gbit100 := 12.5 * float64(humanize.GiByte)
Gbit40 := 5.00 * float64(humanize.GiByte)
Gbit25 := 3.25 * float64(humanize.GiByte)
Gbit10 := 1.25 * float64(humanize.GiByte)
// Gbit1 := 0.25 * float64(humanize.GiByte)
// Given the current defaults, each combination of size/thread
// is supposed to fully saturate the intended pipe when all threads are active
// i.e. if the test is performed in a perfectly controlled environment, i.e. without
// CPU scheduling latencies and/or network jitters, then all threads working
// simultaneously should result in each of them completing in 1s
//
// In reality, I've assumed a normal distribution of latency with expected mean of 1s and min of 0s
// Then, 95% of threads should complete within 2 seconds (2 std. deviations from the mean). The 2s comes
// from fitting the normal curve such that the mean is 1.
//
// i.e. we expect that no more than 5% of threads to take longer than 2s to push the data.
//
// throughput | max latency
// 100 Gbit | 2s
// 40 Gbit | 2s
// 25 Gbit | 2s
// 10 Gbit | 2s
// 1 Gbit | inf
throughput := float64(int64(size) * int64(threadCount))
if throughput >= Gbit100 {
return 2.0
} else if throughput >= Gbit40 {
return 2.0
} else if throughput >= Gbit25 {
return 2.0
} else if throughput >= Gbit10 {
return 2.0
}
return math.MaxFloat64
}
// NetOBDInfo - fetch Net OBD information for a remote node.
func (client *peerRESTClient) NetOBDInfo(ctx context.Context) (info madmin.NetOBDInfo, err error) {
// 100 Gbit -> 256 MiB * 50 threads
// 40 Gbit -> 256 MiB * 20 threads
// 25 Gbit -> 128 MiB * 25 threads
// 10 Gbit -> 128 MiB * 10 threads
// 1 Gbit -> 64 MiB * 2 threads
type step struct {
size int64
threads uint
}
steps := []step{
{ // 100 Gbit
size: 256 * humanize.MiByte,
threads: 50,
},
{ // 40 Gbit
size: 256 * humanize.MiByte,
threads: 20,
},
{ // 25 Gbit
size: 128 * humanize.MiByte,
threads: 25,
},
{ // 10 Gbit
size: 128 * humanize.MiByte,
threads: 10,
},
{ // 1 Gbit
size: 64 * humanize.MiByte,
threads: 2,
},
}
for i := range steps {
size := steps[i].size
threads := steps[i].threads
if info, err = client.doNetOBDTest(ctx, size, threads); err != nil {
if err == networkOverloaded {
continue
}
if netErr, ok := err.(*rest.NetworkError); ok {
if urlErr, ok := netErr.Err.(*url.Error); ok {
if urlErr.Err.Error() == context.Canceled.Error() {
continue
}
if urlErr.Err.Error() == context.DeadlineExceeded.Error() {
continue
}
}
}
}
return info, err
}
return info, err
}
// DispatchNetOBDInfo - dispatch other nodes to run Net OBD.
func (client *peerRESTClient) DispatchNetOBDInfo(ctx context.Context) (info madmin.ServerNetOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodDispatchNetOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
waitReader, err := waitForHTTPResponse(respBody)
if err != nil {
return
}
err = gob.NewDecoder(waitReader).Decode(&info)
return
}
// DriveOBDInfo - fetch Drive OBD information for a remote node.
func (client *peerRESTClient) DriveOBDInfo(ctx context.Context) (info madmin.ServerDrivesOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodDriveOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// CPUOBDInfo - fetch CPU OBD information for a remote node.
func (client *peerRESTClient) CPUOBDInfo(ctx context.Context) (info madmin.ServerCPUOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodCPUOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// DiskHwOBDInfo - fetch Disk HW OBD information for a remote node.
func (client *peerRESTClient) DiskHwOBDInfo(ctx context.Context) (info madmin.ServerDiskHwOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodDiskHwOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// OsOBDInfo - fetch OsInfo OBD information for a remote node.
func (client *peerRESTClient) OsOBDInfo(ctx context.Context) (info madmin.ServerOsOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodOsInfoOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// MemOBDInfo - fetch MemInfo OBD information for a remote node.
func (client *peerRESTClient) MemOBDInfo(ctx context.Context) (info madmin.ServerMemOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodMemOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// ProcOBDInfo - fetch ProcInfo OBD information for a remote node.
func (client *peerRESTClient) ProcOBDInfo(ctx context.Context) (info madmin.ServerProcOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodProcOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// StartProfiling - Issues profiling command on the peer node.
func (client *peerRESTClient) StartProfiling(profiler string) error {
values := make(url.Values)
values.Set(peerRESTProfiler, profiler)
respBody, err := client.call(peerRESTMethodStartProfiling, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// DownloadProfileData - download profiled data from a remote node.
func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err error) {
respBody, err := client.call(peerRESTMethodDownloadProfilingData, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&data)
return data, err
}
// DeleteBucket - Delete notification and policies related to the bucket.
func (client *peerRESTClient) DeleteBucket(bucket string) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
respBody, err := client.call(peerRESTMethodDeleteBucket, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// ReloadFormat - reload format on the peer node.
func (client *peerRESTClient) ReloadFormat(dryRun bool) error {
values := make(url.Values)
if dryRun {
values.Set(peerRESTDryRun, "true")
} else {
values.Set(peerRESTDryRun, "false")
}
respBody, err := client.call(peerRESTMethodReloadFormat, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// RemoveBucketPolicy - Remove bucket policy on the peer node.
func (client *peerRESTClient) RemoveBucketPolicy(bucket string) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
respBody, err := client.call(peerRESTMethodBucketPolicyRemove, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// RemoveBucketObjectLockConfig - Remove bucket object lock config on the peer node.
func (client *peerRESTClient) RemoveBucketObjectLockConfig(bucket string) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
respBody, err := client.call(peerRESTMethodBucketObjectLockConfigRemove, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// cycleServerBloomFilter will cycle the bloom filter to start recording to index y if not already.
// The response will contain a bloom filter starting at index x up to, but not including index y.
// If y is 0, the response will not update y, but return the currently recorded information
// from the current x to y-1.
func (client *peerRESTClient) cycleServerBloomFilter(ctx context.Context, req bloomFilterRequest) (*bloomFilterResponse, error) {
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(req)
if err != nil {
return nil, err
}
respBody, err := client.call(peerRESTMethodCycleBloom, nil, &reader, -1)
if err != nil {
return nil, err
}
var resp bloomFilterResponse
defer http.DrainBody(respBody)
return &resp, gob.NewDecoder(respBody).Decode(&resp)
}
// SetBucketPolicy - Set bucket policy on the peer node.
func (client *peerRESTClient) SetBucketPolicy(bucket string, bucketPolicy *policy.Policy) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(bucketPolicy)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodBucketPolicySet, values, &reader, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// RemoveBucketLifecycle - Remove bucket lifecycle configuration on the peer node
func (client *peerRESTClient) RemoveBucketLifecycle(bucket string) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
respBody, err := client.call(peerRESTMethodBucketLifecycleRemove, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// SetBucketLifecycle - Set bucket lifecycle configuration on the peer node
func (client *peerRESTClient) SetBucketLifecycle(bucket string, bucketLifecycle *lifecycle.Lifecycle) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(bucketLifecycle)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodBucketLifecycleSet, values, &reader, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// RemoveBucketSSEConfig - Remove bucket encryption configuration on the peer node
func (client *peerRESTClient) RemoveBucketSSEConfig(bucket string) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
respBody, err := client.call(peerRESTMethodBucketEncryptionRemove, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// SetBucketSSEConfig - Set bucket encryption configuration on the peer node
func (client *peerRESTClient) SetBucketSSEConfig(bucket string, encConfig *bucketsse.BucketSSEConfig) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(encConfig)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodBucketEncryptionSet, values, &reader, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// PutBucketNotification - Put bucket notification on the peer node.
func (client *peerRESTClient) PutBucketNotification(bucket string, rulesMap event.RulesMap) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(&rulesMap)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodBucketNotificationPut, values, &reader, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// PutBucketObjectLockConfig - PUT bucket object lock configuration.
func (client *peerRESTClient) PutBucketObjectLockConfig(bucket string, retention objectlock.Retention) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(&retention)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodPutBucketObjectLockConfig, values, &reader, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// DeletePolicy - delete a specific canned policy.
func (client *peerRESTClient) DeletePolicy(policyName string) (err error) {
values := make(url.Values)
values.Set(peerRESTPolicy, policyName)
respBody, err := client.call(peerRESTMethodDeletePolicy, values, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
return nil
}
// LoadPolicy - reload a specific canned policy.
func (client *peerRESTClient) LoadPolicy(policyName string) (err error) {
values := make(url.Values)
values.Set(peerRESTPolicy, policyName)
respBody, err := client.call(peerRESTMethodLoadPolicy, values, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
return nil
}
// LoadPolicyMapping - reload a specific policy mapping
func (client *peerRESTClient) LoadPolicyMapping(userOrGroup string, isGroup bool) error {
values := make(url.Values)
values.Set(peerRESTUserOrGroup, userOrGroup)
if isGroup {
values.Set(peerRESTIsGroup, "")
}
respBody, err := client.call(peerRESTMethodLoadPolicyMapping, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// DeleteUser - delete a specific user.
func (client *peerRESTClient) DeleteUser(accessKey string) (err error) {
values := make(url.Values)
values.Set(peerRESTUser, accessKey)
respBody, err := client.call(peerRESTMethodDeleteUser, values, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
return nil
}
// DeleteServiceAccount - delete a specific service account.
func (client *peerRESTClient) DeleteServiceAccount(accessKey string) (err error) {
values := make(url.Values)
values.Set(peerRESTUser, accessKey)
respBody, err := client.call(peerRESTMethodDeleteServiceAccount, values, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
return nil
}
// LoadUser - reload a specific user.
func (client *peerRESTClient) LoadUser(accessKey string, temp bool) (err error) {
values := make(url.Values)
values.Set(peerRESTUser, accessKey)
values.Set(peerRESTUserTemp, strconv.FormatBool(temp))
respBody, err := client.call(peerRESTMethodLoadUser, values, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
return nil
}
// LoadServiceAccount - reload a specific service account.
func (client *peerRESTClient) LoadServiceAccount(accessKey string) (err error) {
values := make(url.Values)
values.Set(peerRESTUser, accessKey)
respBody, err := client.call(peerRESTMethodLoadServiceAccount, values, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
return nil
}
// LoadGroup - send load group command to peers.
func (client *peerRESTClient) LoadGroup(group string) error {
values := make(url.Values)
values.Set(peerRESTGroup, group)
respBody, err := client.call(peerRESTMethodLoadGroup, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// ServerUpdate - sends server update message to remote peers.
func (client *peerRESTClient) ServerUpdate(updateURL, sha256Hex string, latestReleaseTime time.Time) error {
values := make(url.Values)
values.Set(peerRESTUpdateURL, updateURL)
values.Set(peerRESTSha256Hex, sha256Hex)
if !latestReleaseTime.IsZero() {
values.Set(peerRESTLatestRelease, latestReleaseTime.Format(time.RFC3339))
} else {
values.Set(peerRESTLatestRelease, "")
}
respBody, err := client.call(peerRESTMethodServerUpdate, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// SignalService - sends signal to peer nodes.
func (client *peerRESTClient) SignalService(sig serviceSignal) error {
values := make(url.Values)
values.Set(peerRESTSignal, strconv.Itoa(int(sig)))
respBody, err := client.call(peerRESTMethodSignalService, values, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error) {
respBody, err := client.call(peerRESTMethodBackgroundHealStatus, nil, nil, -1)
if err != nil {
return madmin.BgHealState{}, err
}
defer http.DrainBody(respBody)
state := madmin.BgHealState{}
err = gob.NewDecoder(respBody).Decode(&state)
return state, err
}
func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh <-chan struct{}, trcAll, trcErr bool) {
values := make(url.Values)
values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll))
values.Set(peerRESTTraceErr, strconv.FormatBool(trcErr))
// To cancel the REST request in case doneCh gets closed.
ctx, cancel := context.WithCancel(GlobalContext)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
select {
case <-doneCh:
case <-cancelCh:
// There was an error in the REST request.
}
cancel()
}()
respBody, err := client.callWithContext(ctx, peerRESTMethodTrace, values, nil, -1)
defer http.DrainBody(respBody)
if err != nil {
return
}
dec := gob.NewDecoder(respBody)
for {
var info trace.Info
if err = dec.Decode(&info); err != nil {
return
}
if len(info.NodeName) > 0 {
select {
case traceCh <- info:
default:
// Do not block on slow receivers.
}
}
}
}
func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh <-chan struct{}, v url.Values) {
// To cancel the REST request in case doneCh gets closed.
ctx, cancel := context.WithCancel(GlobalContext)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
select {
case <-doneCh:
case <-cancelCh:
// There was an error in the REST request.
}
cancel()
}()
respBody, err := client.callWithContext(ctx, peerRESTMethodListen, v, nil, -1)
defer http.DrainBody(respBody)
if err != nil {
return
}
dec := gob.NewDecoder(respBody)
for {
var ev event.Event
if err = dec.Decode(&ev); err != nil {
return
}
if len(ev.EventVersion) > 0 {
select {
case listenCh <- ev:
default:
// Do not block on slow receivers.
}
}
}
}
// Listen - listen on peers.
func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh <-chan struct{}, v url.Values) {
go func() {
for {
client.doListen(listenCh, doneCh, v)
select {
case <-doneCh:
return
default:
// There was error in the REST request, retry after sometime as probably the peer is down.
time.Sleep(5 * time.Second)
}
}
}()
}
// Trace - send http trace request to peer nodes
func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh <-chan struct{}, trcAll, trcErr bool) {
go func() {
for {
client.doTrace(traceCh, doneCh, trcAll, trcErr)
select {
case <-doneCh:
return
default:
// There was error in the REST request, retry after sometime as probably the peer is down.
time.Sleep(5 * time.Second)
}
}
}()
}
// ConsoleLog - sends request to peer nodes to get console logs
func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) {
go func() {
for {
// get cancellation context to properly unsubscribe peers
ctx, cancel := context.WithCancel(GlobalContext)
respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1)
if err != nil {
// Retry the failed request.
time.Sleep(5 * time.Second)
} else {
dec := gob.NewDecoder(respBody)
go func() {
<-doneCh
cancel()
}()
for {
var log madmin.LogInfo
if err = dec.Decode(&log); err != nil {
break
}
select {
case logCh <- log:
default:
}
}
}
select {
case <-doneCh:
cancel()
http.DrainBody(respBody)
return
default:
// There was error in the REST request, retry.
}
}
}()
}
func getRemoteHosts(endpointZones EndpointZones) []*xnet.Host {
var remoteHosts []*xnet.Host
for _, hostStr := range GetRemotePeers(endpointZones) {
host, err := xnet.ParseHost(hostStr)
if err != nil {
logger.LogIf(GlobalContext, err)
continue
}
remoteHosts = append(remoteHosts, host)
}
return remoteHosts
}
// newPeerRestClients creates new peer clients.
func newPeerRestClients(endpoints EndpointZones) []*peerRESTClient {
peerHosts := getRemoteHosts(endpoints)
restClients := make([]*peerRESTClient, len(peerHosts))
for i, host := range peerHosts {
client, err := newPeerRESTClient(host)
if err != nil {
logger.LogIf(GlobalContext, err)
continue
}
restClients[i] = client
}
return restClients
}
// Returns a peer rest client.
func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) {
scheme := "http"
if globalIsSSL {
scheme = "https"
}
serverURL := &url.URL{
Scheme: scheme,
Host: peer.String(),
Path: peerRESTPath,
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: peer.Name,
RootCAs: globalRootCAs,
}
}
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil {
return nil, err
}
return &peerRESTClient{host: peer, restClient: restClient, connected: 1}, nil
}