diff options
author | Slusarenko Igor <s2m1@ydb.tech> | 2025-03-27 22:44:47 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-27 22:44:47 +0300 |
commit | 4a35432ea3dacc3ff490c510fd8bf2006e346e4c (patch) | |
tree | 63abb641201c115326d8d13e3d36b0893f3eec2b | |
parent | 5f9dbe8d4b6419438a6aba8810febb545a77c924 (diff) | |
download | ydb-4a35432ea3dacc3ff490c510fd8bf2006e346e4c.tar.gz |
Add create context to all requests (#16319)
-rw-r--r-- | ydb/library/yql/providers/generic/connector/libcpp/client.cpp | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp index 18a111c3aa..0ec5fa03f4 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp @@ -252,8 +252,10 @@ namespace NYql::NConnector { promise.SetValue({std::move(status), std::move(NApi::TDescribeTableResponse())}); return promise.GetFuture(); } + + auto context = CreateClientContext(); - auto callback = [promise](NYdbGrpc::TGrpcStatus&& status, NApi::TDescribeTableResponse&& resp) mutable { + auto callback = [context, promise](NYdbGrpc::TGrpcStatus&& status, NApi::TDescribeTableResponse&& resp) mutable { promise.SetValue({std::move(status), std::move(resp)}); }; @@ -261,7 +263,8 @@ namespace NYql::NConnector { std::move(request), std::move(callback), &NApi::Connector::Stub::AsyncDescribeTable, - { .Timeout = timeout } + { .Timeout = timeout }, + context.get() ); return promise.GetFuture(); @@ -307,6 +310,16 @@ namespace NYql::NConnector { } private: + NYdbGrpc::IQueueClientContextPtr CreateClientContext() { + auto context = GrpcClient_->CreateContext(); + + if (!context) { + throw yexception() << "Client is being shut down"; + } + + return context; + } + void Init(const TGenericGatewayConfig& config) { // TODO: place in a config file ? GrpcClient_ = std::make_shared<NYdbGrpc::TGRpcClientLow>(DEFAULT_CONNECTION_MANAGER_NUM_THREADS); @@ -378,10 +391,11 @@ namespace NYql::NConnector { typename TRpcCallback = typename NYdbGrpc::TStreamRequestReadProcessor<NApi::Connector::Stub, TRequest, TResponse>::TAsyncRequest > TIteratorAsyncResult<IStreamIterator<TResponse>> ServerSideStreamingCall( - const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) const { + const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) { auto promise = NThreading::NewPromise<TIteratorResult<IStreamIterator<TResponse>>>(); + auto context = CreateClientContext(); - auto callback = [promise](NYdbGrpc::TGrpcStatus&& status, NYdbGrpc::IStreamRequestReadProcessor<TResponse>::TPtr streamProcessor) mutable { + auto callback = [context, promise](NYdbGrpc::TGrpcStatus&& status, NYdbGrpc::IStreamRequestReadProcessor<TResponse>::TPtr streamProcessor) mutable { if (!streamProcessor) { promise.SetValue({std::move(status), nullptr}); return; @@ -396,7 +410,8 @@ namespace NYql::NConnector { std::move(request), std::move(callback), rpc, - { .Timeout = timeout } + { .Timeout = timeout }, + context.get() ); return promise.GetFuture(); |