Enable multiple concurrent profile types (#8792)

master
Klaus Post 5 years ago committed by kannappanr
parent 686d4656de
commit 2bf6cf0e15
  1. 36
      cmd/admin-handlers.go
  2. 93
      cmd/notification.go
  3. 2
      cmd/peer-rest-client.go
  4. 37
      cmd/peer-rest-server.go
  5. 8
      cmd/signals.go
  6. 149
      cmd/utils.go
  7. 2
      cmd/utils_test.go
  8. 1
      go.mod

@ -571,30 +571,44 @@ func (a adminAPIHandlers) StartProfilingHandler(w http.ResponseWriter, r *http.R
} }
vars := mux.Vars(r) vars := mux.Vars(r)
profiler := vars["profilerType"] profiles := strings.Split(vars["profilerType"], ",")
thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints))
if err != nil { if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return return
} }
// Start profiling on remote servers. globalProfilerMu.Lock()
hostErrs := globalNotificationSys.StartProfiling(profiler) defer globalProfilerMu.Unlock()
// Start profiling locally as well. if globalProfiler == nil {
{ globalProfiler = make(map[string]minioProfiler, 10)
if globalProfiler != nil { }
globalProfiler.Stop()
// Stop profiler of all types if already running
for k, v := range globalProfiler {
for _, p := range profiles {
if p == k {
v.Stop()
delete(globalProfiler, k)
}
} }
prof, err := startProfiler(profiler, "") }
// Start profiling on remote servers.
var hostErrs []NotificationPeerErr
for _, profiler := range profiles {
hostErrs = append(hostErrs, globalNotificationSys.StartProfiling(profiler)...)
// Start profiling locally as well.
prof, err := startProfiler(profiler)
if err != nil { if err != nil {
hostErrs = append(hostErrs, NotificationPeerErr{ hostErrs = append(hostErrs, NotificationPeerErr{
Host: *thisAddr, Host: *thisAddr,
Err: err, Err: err,
}) })
} else { } else {
globalProfiler = prof globalProfiler[profiler] = prof
hostErrs = append(hostErrs, NotificationPeerErr{ hostErrs = append(hostErrs, NotificationPeerErr{
Host: *thisAddr, Host: *thisAddr,
}) })
@ -620,7 +634,7 @@ func (a adminAPIHandlers) StartProfilingHandler(w http.ResponseWriter, r *http.R
return return
} }
writeSuccessResponseJSON(w, []byte(startProfilingResultInBytes)) writeSuccessResponseJSON(w, startProfilingResultInBytes)
} }
// dummyFileInfo represents a dummy representation of a profile data file // dummyFileInfo represents a dummy representation of a profile data file

@ -319,36 +319,39 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
profilingDataFound = true profilingDataFound = true
// Send profiling data to zip as file for typ, data := range data {
header, zerr := zip.FileInfoHeader(dummyFileInfo{ // Send profiling data to zip as file
name: fmt.Sprintf("profiling-%s.pprof", client.host.String()), header, zerr := zip.FileInfoHeader(dummyFileInfo{
size: int64(len(data)), name: fmt.Sprintf("profiling-%s-%s.pprof", client.host.String(), typ),
mode: 0600, size: int64(len(data)),
modTime: UTCNow(), mode: 0600,
isDir: false, modTime: UTCNow(),
sys: nil, isDir: false,
}) sys: nil,
if zerr != nil { })
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) if zerr != nil {
ctx := logger.SetReqInfo(ctx, reqInfo) reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
logger.LogIf(ctx, zerr) ctx := logger.SetReqInfo(ctx, reqInfo)
continue logger.LogIf(ctx, zerr)
} continue
zwriter, zerr := zipWriter.CreateHeader(header) }
if zerr != nil { zwriter, zerr := zipWriter.CreateHeader(header)
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) if zerr != nil {
ctx := logger.SetReqInfo(ctx, reqInfo) reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
logger.LogIf(ctx, zerr) ctx := logger.SetReqInfo(ctx, reqInfo)
continue logger.LogIf(ctx, zerr)
} continue
if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { }
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil {
ctx := logger.SetReqInfo(ctx, reqInfo) reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
logger.LogIf(ctx, err) ctx := logger.SetReqInfo(ctx, reqInfo)
continue logger.LogIf(ctx, err)
continue
}
} }
} }
// Local host
thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints))
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
@ -366,25 +369,27 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
profilingDataFound = true profilingDataFound = true
// Send profiling data to zip as file // Send profiling data to zip as file
header, zerr := zip.FileInfoHeader(dummyFileInfo{ for typ, data := range data {
name: fmt.Sprintf("profiling-%s.pprof", thisAddr), header, zerr := zip.FileInfoHeader(dummyFileInfo{
size: int64(len(data)), name: fmt.Sprintf("profiling-%s-%s.pprof", thisAddr, typ),
mode: 0600, size: int64(len(data)),
modTime: UTCNow(), mode: 0600,
isDir: false, modTime: UTCNow(),
sys: nil, isDir: false,
}) sys: nil,
if zerr != nil { })
return profilingDataFound if zerr != nil {
} return profilingDataFound
}
zwriter, zerr := zipWriter.CreateHeader(header) zwriter, zerr := zipWriter.CreateHeader(header)
if zerr != nil { if zerr != nil {
return profilingDataFound return profilingDataFound
} }
if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil {
return profilingDataFound return profilingDataFound
}
} }
return profilingDataFound return profilingDataFound

@ -225,7 +225,7 @@ func (client *peerRESTClient) StartProfiling(profiler string) error {
} }
// DownloadProfileData - download profiled data from a remote node. // DownloadProfileData - download profiled data from a remote node.
func (client *peerRESTClient) DownloadProfileData() (data []byte, err error) { func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err error) {
respBody, err := client.call(peerRESTMethodDownloadProfilingData, nil, nil, -1) respBody, err := client.call(peerRESTMethodDownloadProfilingData, nil, nil, -1)
if err != nil { if err != nil {
return return

@ -395,28 +395,41 @@ func (s *peerRESTServer) StartProfilingHandler(w http.ResponseWriter, r *http.Re
} }
vars := mux.Vars(r) vars := mux.Vars(r)
profiler := vars[peerRESTProfiler] profiles := strings.Split(vars[peerRESTProfiler], ",")
if profiler == "" { if len(profiles) == 0 {
s.writeErrorResponse(w, errors.New("profiler name is missing")) s.writeErrorResponse(w, errors.New("profiler name is missing"))
return return
} }
globalProfilerMu.Lock()
defer globalProfilerMu.Unlock()
if globalProfiler == nil {
globalProfiler = make(map[string]minioProfiler, 10)
}
if globalProfiler != nil { // Stop profiler of all types if already running
globalProfiler.Stop() for k, v := range globalProfiler {
for _, p := range profiles {
if p == k {
v.Stop()
delete(globalProfiler, k)
}
}
} }
var err error for _, profiler := range profiles {
globalProfiler, err = startProfiler(profiler, "") prof, err := startProfiler(profiler)
if err != nil { if err != nil {
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
}
globalProfiler[profiler] = prof
} }
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
// DownloadProflingDataHandler - returns proflied data. // DownloadProfilingDataHandler - returns profiled data.
func (s *peerRESTServer) DownloadProflingDataHandler(w http.ResponseWriter, r *http.Request) { func (s *peerRESTServer) DownloadProfilingDataHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) { if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request")) s.writeErrorResponse(w, errors.New("Invalid request"))
return return
@ -1156,7 +1169,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadGroup).HandlerFunc(httpTraceAll(server.LoadGroupHandler)).Queries(restQueries(peerRESTGroup)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadGroup).HandlerFunc(httpTraceAll(server.LoadGroupHandler)).Queries(restQueries(peerRESTGroup)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(httpTraceAll(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(httpTraceAll(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(httpTraceHdrs(server.DownloadProflingDataHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(httpTraceHdrs(server.DownloadProfilingDataHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTargetExists).HandlerFunc(httpTraceHdrs(server.TargetExistsHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTargetExists).HandlerFunc(httpTraceHdrs(server.TargetExistsHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSendEvent).HandlerFunc(httpTraceHdrs(server.SendEventHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSendEvent).HandlerFunc(httpTraceHdrs(server.SendEventHandler)).Queries(restQueries(peerRESTBucket)...)

@ -28,8 +28,12 @@ func handleSignals() {
// Custom exit function // Custom exit function
exit := func(success bool) { exit := func(success bool) {
// If global profiler is set stop before we exit. // If global profiler is set stop before we exit.
if globalProfiler != nil { globalProfilerMu.Lock()
globalProfiler.Stop() defer globalProfilerMu.Unlock()
if len(globalProfiler) > 0 {
for _, p := range globalProfiler {
p.Stop()
}
} }
if success { if success {

@ -32,7 +32,11 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
"runtime"
"runtime/pprof"
"runtime/trace"
"strings" "strings"
"sync"
"time" "time"
"github.com/beevik/ntp" "github.com/beevik/ntp"
@ -42,7 +46,6 @@ import (
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/pkg/profile"
) )
// IsErrIgnored returns whether given error is ignored or not. // IsErrIgnored returns whether given error is ignored or not.
@ -187,94 +190,130 @@ func contains(slice interface{}, elem interface{}) bool {
// provide any API to calculate the profiler file path in the // provide any API to calculate the profiler file path in the
// disk since the name of this latter is randomly generated. // disk since the name of this latter is randomly generated.
type profilerWrapper struct { type profilerWrapper struct {
stopFn func() stopFn func() ([]byte, error)
pathFn func() string
} }
func (p profilerWrapper) Stop() { func (p profilerWrapper) Stop() ([]byte, error) {
p.stopFn() return p.stopFn()
}
func (p profilerWrapper) Path() string {
return p.pathFn()
} }
// Returns current profile data, returns error if there is no active // Returns current profile data, returns error if there is no active
// profiling in progress. Stops an active profile. // profiling in progress. Stops an active profile.
func getProfileData() ([]byte, error) { func getProfileData() (map[string][]byte, error) {
if globalProfiler == nil { globalProfilerMu.Lock()
defer globalProfilerMu.Unlock()
if len(globalProfiler) == 0 {
return nil, errors.New("profiler not enabled") return nil, errors.New("profiler not enabled")
} }
profilerPath := globalProfiler.Path() dst := make(map[string][]byte, len(globalProfiler))
for typ, prof := range globalProfiler {
// Stop the profiler // Stop the profiler
globalProfiler.Stop() var err error
buf, err := prof.Stop()
profilerFile, err := os.Open(profilerPath) delete(globalProfiler, typ)
if err != nil { if err == nil {
return nil, err dst[typ] = buf
}
} }
return dst, nil
return ioutil.ReadAll(profilerFile)
} }
// Starts a profiler returns nil if profiler is not enabled, caller needs to handle this. // Starts a profiler returns nil if profiler is not enabled, caller needs to handle this.
func startProfiler(profilerType, dirPath string) (minioProfiler, error) { func startProfiler(profilerType string) (minioProfiler, error) {
var err error var prof profilerWrapper
if dirPath == "" {
dirPath, err = ioutil.TempDir("", "profile")
if err != nil {
return nil, err
}
}
var profiler interface {
Stop()
}
var profilerFileName string
// Enable profiler and set the name of the file that pkg/pprof // Enable profiler and set the name of the file that pkg/pprof
// library creates to store profiling data. // library creates to store profiling data.
switch profilerType { switch profilerType {
case "cpu": case "cpu":
profiler = profile.Start(profile.CPUProfile, profile.NoShutdownHook, profile.ProfilePath(dirPath)) dirPath, err := ioutil.TempDir("", "profile")
profilerFileName = "cpu.pprof" if err != nil {
return nil, err
}
fn := filepath.Join(dirPath, "cpu.out")
f, err := os.Create(fn)
if err != nil {
return nil, err
}
err = pprof.StartCPUProfile(f)
if err != nil {
return nil, err
}
prof.stopFn = func() ([]byte, error) {
pprof.StopCPUProfile()
err := f.Close()
if err != nil {
return nil, err
}
defer os.RemoveAll(dirPath)
return ioutil.ReadFile(fn)
}
case "mem": case "mem":
profiler = profile.Start(profile.MemProfile, profile.NoShutdownHook, profile.ProfilePath(dirPath)) old := runtime.MemProfileRate
profilerFileName = "mem.pprof" runtime.MemProfileRate = 1
prof.stopFn = func() ([]byte, error) {
var buf bytes.Buffer
runtime.MemProfileRate = old
err := pprof.Lookup("heap").WriteTo(&buf, 0)
return buf.Bytes(), err
}
case "block": case "block":
profiler = profile.Start(profile.BlockProfile, profile.NoShutdownHook, profile.ProfilePath(dirPath)) runtime.SetBlockProfileRate(1)
profilerFileName = "block.pprof" prof.stopFn = func() ([]byte, error) {
var buf bytes.Buffer
runtime.SetBlockProfileRate(0)
err := pprof.Lookup("block").WriteTo(&buf, 0)
return buf.Bytes(), err
}
case "mutex": case "mutex":
profiler = profile.Start(profile.MutexProfile, profile.NoShutdownHook, profile.ProfilePath(dirPath)) runtime.SetMutexProfileFraction(1)
profilerFileName = "mutex.pprof" prof.stopFn = func() ([]byte, error) {
var buf bytes.Buffer
runtime.SetMutexProfileFraction(0)
err := pprof.Lookup("mutex").WriteTo(&buf, 0)
return buf.Bytes(), err
}
case "trace": case "trace":
profiler = profile.Start(profile.TraceProfile, profile.NoShutdownHook, profile.ProfilePath(dirPath)) dirPath, err := ioutil.TempDir("", "profile")
profilerFileName = "trace.out" if err != nil {
return nil, err
}
fn := filepath.Join(dirPath, "trace.out")
f, err := os.Create(fn)
if err != nil {
return nil, err
}
err = trace.Start(f)
if err != nil {
return nil, err
}
prof.stopFn = func() ([]byte, error) {
trace.Stop()
err := f.Close()
if err != nil {
return nil, err
}
defer os.RemoveAll(dirPath)
return ioutil.ReadFile(fn)
}
default: default:
return nil, errors.New("profiler type unknown") return nil, errors.New("profiler type unknown")
} }
return &profilerWrapper{ return prof, nil
stopFn: profiler.Stop,
pathFn: func() string {
return filepath.Join(dirPath, profilerFileName)
},
}, nil
} }
// minioProfiler - minio profiler interface. // minioProfiler - minio profiler interface.
type minioProfiler interface { type minioProfiler interface {
// Stop the profiler // Stop the profiler
Stop() Stop() ([]byte, error)
// Return the path of the profiling file
Path() string
} }
// Global profiler to be used by service go-routine. // Global profiler to be used by service go-routine.
var globalProfiler minioProfiler var globalProfiler map[string]minioProfiler
var globalProfilerMu sync.Mutex
// dump the request into a string in JSON format. // dump the request into a string in JSON format.
func dumpRequest(r *http.Request) string { func dumpRequest(r *http.Request) string {

@ -191,7 +191,7 @@ func TestURL2BucketObjectName(t *testing.T) {
// Add tests for starting and stopping different profilers. // Add tests for starting and stopping different profilers.
func TestStartProfiler(t *testing.T) { func TestStartProfiler(t *testing.T) {
_, err := startProfiler("", "") _, err := startProfiler("")
if err == nil { if err == nil {
t.Fatal("Expected a non nil error, but nil error returned for invalid profiler.") t.Fatal("Expected a non nil error, but nil error returned for invalid profiler.")
} }

@ -85,7 +85,6 @@ require (
github.com/ncw/directio v1.0.5 github.com/ncw/directio v1.0.5
github.com/nsqio/go-nsq v1.0.7 github.com/nsqio/go-nsq v1.0.7
github.com/pkg/errors v0.8.1 github.com/pkg/errors v0.8.1
github.com/pkg/profile v1.3.0
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect
github.com/rcrowley/go-metrics v0.0.0-20190704165056-9c2d0518ed81 // indirect github.com/rcrowley/go-metrics v0.0.0-20190704165056-9c2d0518ed81 // indirect

Loading…
Cancel
Save