1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package service defines methods to register a gRPC client/service for a
// profiling service that is exposed in the same server. This service can be
// queried by a client to remotely manage the gRPC profiling behaviour of an
// application.
//
// # Experimental
//
// Notice: This package is EXPERIMENTAL and may be changed or removed in a
// later release.
package service
import (
"context"
"errors"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/profiling"
ppb "google.golang.org/grpc/profiling/proto"
)
var logger = grpclog.Component("profiling")
// ProfilingConfig defines configuration options for the Init method.
type ProfilingConfig struct {
// Setting this to true will enable profiling.
Enabled bool
// Profiling uses a circular buffer (ring buffer) to store statistics for
// only the last few RPCs so that profiling stats do not grow unbounded. This
// parameter defines the upper limit on the number of RPCs for which
// statistics should be stored at any given time. An average RPC requires
// approximately 2-3 KiB of memory for profiling-related statistics, so
// choose an appropriate number based on the amount of memory you can afford.
StreamStatsSize uint32
// To expose the profiling service and its methods, a *grpc.Server must be
// provided.
Server *grpc.Server
}
var errorNilServer = errors.New("profiling: no grpc.Server provided")
// Init takes a *ProfilingConfig to initialize profiling (turned on/off
// depending on the value set in pc.Enabled) and register the profiling service
// in the server provided in pc.Server.
func Init(pc *ProfilingConfig) error {
if pc.Server == nil {
return errorNilServer
}
if err := profiling.InitStats(pc.StreamStatsSize); err != nil {
return err
}
ppb.RegisterProfilingServer(pc.Server, getProfilingServerInstance())
// Do this last after everything has been initialized and allocated.
profiling.Enable(pc.Enabled)
return nil
}
type profilingServer struct {
ppb.UnimplementedProfilingServer
drainMutex sync.Mutex
}
var profilingServerInstance *profilingServer
var profilingServerOnce sync.Once
// getProfilingServerInstance creates and returns a singleton instance of
// profilingServer. Only one instance of profilingServer is created to use a
// shared mutex across all profilingServer instances.
func getProfilingServerInstance() *profilingServer {
profilingServerOnce.Do(func() {
profilingServerInstance = &profilingServer{}
})
return profilingServerInstance
}
func (s *profilingServer) Enable(ctx context.Context, req *ppb.EnableRequest) (*ppb.EnableResponse, error) {
if req.Enabled {
logger.Infof("profilingServer: Enable: enabling profiling")
} else {
logger.Infof("profilingServer: Enable: disabling profiling")
}
profiling.Enable(req.Enabled)
return &ppb.EnableResponse{}, nil
}
func timerToProtoTimer(timer *profiling.Timer) *ppb.Timer {
return &ppb.Timer{
Tags: timer.Tags,
BeginSec: timer.Begin.Unix(),
BeginNsec: int32(timer.Begin.Nanosecond()),
EndSec: timer.End.Unix(),
EndNsec: int32(timer.End.Nanosecond()),
GoId: timer.GoID,
}
}
func statToProtoStat(stat *profiling.Stat) *ppb.Stat {
protoStat := &ppb.Stat{
Tags: stat.Tags,
Timers: make([]*ppb.Timer, 0, len(stat.Timers)),
Metadata: stat.Metadata,
}
for _, t := range stat.Timers {
protoStat.Timers = append(protoStat.Timers, timerToProtoTimer(t))
}
return protoStat
}
func (s *profilingServer) GetStreamStats(ctx context.Context, req *ppb.GetStreamStatsRequest) (*ppb.GetStreamStatsResponse, error) {
// Since the drain operation is destructive, only one client request should
// be served at a time.
logger.Infof("profilingServer: GetStreamStats: processing request")
s.drainMutex.Lock()
results := profiling.StreamStats.Drain()
s.drainMutex.Unlock()
logger.Infof("profilingServer: GetStreamStats: returning %v records", len(results))
streamStats := make([]*ppb.Stat, 0)
for _, stat := range results {
streamStats = append(streamStats, statToProtoStat(stat.(*profiling.Stat)))
}
return &ppb.GetStreamStatsResponse{StreamStats: streamStats}, nil
}
|