aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSlusarenko Igor <s2m1@ydb.tech>2025-03-27 22:44:47 +0300
committerGitHub <noreply@github.com>2025-03-27 22:44:47 +0300
commit4a35432ea3dacc3ff490c510fd8bf2006e346e4c (patch)
tree63abb641201c115326d8d13e3d36b0893f3eec2b
parent5f9dbe8d4b6419438a6aba8810febb545a77c924 (diff)
downloadydb-4a35432ea3dacc3ff490c510fd8bf2006e346e4c.tar.gz
Add create context to all requests (#16319)
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/client.cpp25
1 files changed, 20 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp
index 18a111c3aa..0ec5fa03f4 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp
@@ -252,8 +252,10 @@ namespace NYql::NConnector {
promise.SetValue({std::move(status), std::move(NApi::TDescribeTableResponse())});
return promise.GetFuture();
}
+
+ auto context = CreateClientContext();
- auto callback = [promise](NYdbGrpc::TGrpcStatus&& status, NApi::TDescribeTableResponse&& resp) mutable {
+ auto callback = [context, promise](NYdbGrpc::TGrpcStatus&& status, NApi::TDescribeTableResponse&& resp) mutable {
promise.SetValue({std::move(status), std::move(resp)});
};
@@ -261,7 +263,8 @@ namespace NYql::NConnector {
std::move(request),
std::move(callback),
&NApi::Connector::Stub::AsyncDescribeTable,
- { .Timeout = timeout }
+ { .Timeout = timeout },
+ context.get()
);
return promise.GetFuture();
@@ -307,6 +310,16 @@ namespace NYql::NConnector {
}
private:
+ NYdbGrpc::IQueueClientContextPtr CreateClientContext() {
+ auto context = GrpcClient_->CreateContext();
+
+ if (!context) {
+ throw yexception() << "Client is being shut down";
+ }
+
+ return context;
+ }
+
void Init(const TGenericGatewayConfig& config) {
// TODO: place in a config file ?
GrpcClient_ = std::make_shared<NYdbGrpc::TGRpcClientLow>(DEFAULT_CONNECTION_MANAGER_NUM_THREADS);
@@ -378,10 +391,11 @@ namespace NYql::NConnector {
typename TRpcCallback = typename NYdbGrpc::TStreamRequestReadProcessor<NApi::Connector::Stub, TRequest, TResponse>::TAsyncRequest
>
TIteratorAsyncResult<IStreamIterator<TResponse>> ServerSideStreamingCall(
- const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) const {
+ const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) {
auto promise = NThreading::NewPromise<TIteratorResult<IStreamIterator<TResponse>>>();
+ auto context = CreateClientContext();
- auto callback = [promise](NYdbGrpc::TGrpcStatus&& status, NYdbGrpc::IStreamRequestReadProcessor<TResponse>::TPtr streamProcessor) mutable {
+ auto callback = [context, promise](NYdbGrpc::TGrpcStatus&& status, NYdbGrpc::IStreamRequestReadProcessor<TResponse>::TPtr streamProcessor) mutable {
if (!streamProcessor) {
promise.SetValue({std::move(status), nullptr});
return;
@@ -396,7 +410,8 @@ namespace NYql::NConnector {
std::move(request),
std::move(callback),
rpc,
- { .Timeout = timeout }
+ { .Timeout = timeout },
+ context.get()
);
return promise.GetFuture();