1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
|
//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CORE_LIB_SURFACE_CHANNEL_H
#define GRPC_SRC_CORE_LIB_SURFACE_CHANNEL_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <map>
#include <util/generic/string.h>
#include <util/string/cast.h>
#include <utility>
#include "y_absl/base/thread_annotations.h"
#include "y_absl/status/statusor.h"
#include "y_absl/strings/string_view.h"
#include "y_absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/grpc.h>
#include <grpc/impl/compression_types.h>
#include <grpc/slice.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h" // IWYU pragma: keep
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/cpp_impl_of.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport_fwd.h"
/// The same as grpc_channel_destroy, but doesn't create an ExecCtx, and so
/// is safe to use from within core.
void grpc_channel_destroy_internal(grpc_channel* channel);
/// Create a call given a grpc_channel, in order to call \a method.
/// Progress is tied to activity on \a pollset_set. The returned call object is
/// meant to be used with \a grpc_call_start_batch_and_execute, which relies on
/// callbacks to signal completions. \a method and \a host need
/// only live through the invocation of this function. If \a parent_call is
/// non-NULL, it must be a server-side call. It will be used to propagate
/// properties from the server call to this new client call, depending on the
/// value of \a propagation_mask (see propagation_bits.h for possible values)
grpc_call* grpc_channel_create_pollset_set_call(
grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
grpc_pollset_set* pollset_set, const grpc_slice& method,
const grpc_slice* host, grpc_core::Timestamp deadline, void* reserved);
/// Get a (borrowed) pointer to this channels underlying channel stack
grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel);
grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
grpc_channel* channel);
size_t grpc_channel_get_call_size_estimate(grpc_channel* channel);
void grpc_channel_update_call_size_estimate(grpc_channel* channel, size_t size);
namespace grpc_core {
struct RegisteredCall {
Slice path;
y_absl::optional<Slice> authority;
explicit RegisteredCall(const char* method_arg, const char* host_arg);
RegisteredCall(const RegisteredCall& other);
RegisteredCall& operator=(const RegisteredCall&) = delete;
~RegisteredCall();
};
struct CallRegistrationTable {
Mutex mu;
// The map key should be owned strings rather than unowned char*'s to
// guarantee that it outlives calls on the core channel (which may outlast the
// C++ or other wrapped language Channel that registered these calls).
std::map<std::pair<TString, TString>, RegisteredCall> map
Y_ABSL_GUARDED_BY(mu);
int method_registration_attempts Y_ABSL_GUARDED_BY(mu) = 0;
};
class Channel : public RefCounted<Channel>,
public CppImplOf<Channel, grpc_channel> {
public:
static y_absl::StatusOr<RefCountedPtr<Channel>> Create(
const char* target, ChannelArgs args,
grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport);
static y_absl::StatusOr<RefCountedPtr<Channel>> CreateWithBuilder(
ChannelStackBuilder* builder);
grpc_channel_stack* channel_stack() const { return channel_stack_.get(); }
grpc_compression_options compression_options() const {
return compression_options_;
}
channelz::ChannelNode* channelz_node() const { return channelz_node_.get(); }
size_t CallSizeEstimate() {
// We round up our current estimate to the NEXT value of kRoundUpSize.
// This ensures:
// 1. a consistent size allocation when our estimate is drifting slowly
// (which is common) - which tends to help most allocators reuse memory
// 2. a small amount of allowed growth over the estimate without hitting
// the arena size doubling case, reducing overall memory usage
static constexpr size_t kRoundUpSize = 256;
return (call_size_estimate_.load(std::memory_order_relaxed) +
2 * kRoundUpSize) &
~(kRoundUpSize - 1);
}
void UpdateCallSizeEstimate(size_t size);
y_absl::string_view target() const { return target_; }
MemoryAllocator* allocator() { return &allocator_; }
bool is_client() const { return is_client_; }
bool is_promising() const { return is_promising_; }
RegisteredCall* RegisterCall(const char* method, const char* host);
int TestOnlyRegisteredCalls() {
MutexLock lock(®istration_table_.mu);
return registration_table_.map.size();
}
int TestOnlyRegistrationAttempts() {
MutexLock lock(®istration_table_.mu);
return registration_table_.method_registration_attempts;
}
grpc_event_engine::experimental::EventEngine* event_engine() const {
return channel_stack_->EventEngine();
}
private:
Channel(bool is_client, bool is_promising, TString target,
const ChannelArgs& channel_args,
grpc_compression_options compression_options,
RefCountedPtr<grpc_channel_stack> channel_stack);
const bool is_client_;
const bool is_promising_;
const grpc_compression_options compression_options_;
std::atomic<size_t> call_size_estimate_;
CallRegistrationTable registration_table_;
RefCountedPtr<channelz::ChannelNode> channelz_node_;
MemoryAllocator allocator_;
TString target_;
const RefCountedPtr<grpc_channel_stack> channel_stack_;
};
} // namespace grpc_core
inline grpc_compression_options grpc_channel_compression_options(
const grpc_channel* channel) {
return grpc_core::Channel::FromC(channel)->compression_options();
}
inline grpc_channel_stack* grpc_channel_get_channel_stack(
grpc_channel* channel) {
return grpc_core::Channel::FromC(channel)->channel_stack();
}
inline grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
grpc_channel* channel) {
return grpc_core::Channel::FromC(channel)->channelz_node();
}
inline void grpc_channel_internal_ref(grpc_channel* channel,
const char* reason) {
grpc_core::Channel::FromC(channel)->Ref(DEBUG_LOCATION, reason).release();
}
inline void grpc_channel_internal_unref(grpc_channel* channel,
const char* reason) {
grpc_core::Channel::FromC(channel)->Unref(DEBUG_LOCATION, reason);
}
// Return the channel's compression options.
grpc_compression_options grpc_channel_compression_options(
const grpc_channel* channel);
// Ping the channels peer (load balanced channels will select one sub-channel to
// ping); if the channel is not connected, posts a failed.
void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq,
void* tag, void* reserved);
#endif // GRPC_SRC_CORE_LIB_SURFACE_CHANNEL_H
|