aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/client.cpp
diff options
context:
space:
mode:
authorVitaly Isaev <vitalyisaev@ydb.tech>2024-12-12 15:39:00 +0000
committerVitaly Isaev <vitalyisaev@ydb.tech>2024-12-12 15:39:00 +0000
commit827b115675004838023427572a7c69f40a86a80a (patch)
treee99c953fe494b9de8d8597a15859d77c81f118c7 /yt/cpp/mapreduce/client/client.cpp
parent42701242eaf5be980cb935631586d0e90b82641c (diff)
parentfab222fd8176d00eee5ddafc6bce8cb95a6e3ab0 (diff)
downloadydb-827b115675004838023427572a7c69f40a86a80a.tar.gz
Merge branch 'rightlib' into rightlib_20241212
Diffstat (limited to 'yt/cpp/mapreduce/client/client.cpp')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp37
1 files changed, 29 insertions, 8 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 43e3864ae2..a2d302212f 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -42,6 +42,7 @@
#include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
+#include <yt/cpp/mapreduce/raw_client/raw_client.h>
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
#include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h>
@@ -90,10 +91,12 @@ void ApplyProxyUrlAliasingRules(TString& url)
////////////////////////////////////////////////////////////////////////////////
TClientBase::TClientBase(
+ IRawClientPtr rawClient,
const TClientContext& context,
const TTransactionId& transactionId,
IClientRetryPolicyPtr retryPolicy)
- : Context_(context)
+ : RawClient_(std::move(rawClient))
+ , Context_(context)
, TransactionId_(transactionId)
, ClientRetryPolicy_(std::move(retryPolicy))
{ }
@@ -101,7 +104,7 @@ TClientBase::TClientBase(
ITransactionPtr TClientBase::StartTransaction(
const TStartTransactionOptions& options)
{
- return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options);
+ return MakeIntrusive<TTransaction>(RawClient_, GetParentClientImpl(), Context_, TransactionId_, options);
}
TNodeId TClientBase::Create(
@@ -138,7 +141,11 @@ void TClientBase::Set(
const TNode& value,
const TSetOptions& options)
{
- NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &value, &options] (TMutationId& mutationId) {
+ RawClient_->Set(mutationId, TransactionId_, path, value, options);
+ });
}
void TClientBase::MultisetAttributes(
@@ -826,6 +833,11 @@ IClientPtr TClientBase::GetParentClient()
return GetParentClientImpl();
}
+IRawClientPtr TClientBase::GetRawClient() const
+{
+ return RawClient_;
+}
+
const TClientContext& TClientBase::GetContext() const
{
return Context_;
@@ -839,11 +851,12 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
////////////////////////////////////////////////////////////////////////////////
TTransaction::TTransaction(
+ IRawClientPtr rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& parentTransactionId,
const TStartTransactionOptions& options)
- : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy())
+ : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
MakeHolder<TPingableTransaction>(
@@ -858,11 +871,12 @@ TTransaction::TTransaction(
}
TTransaction::TTransaction(
+ IRawClientPtr rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& transactionId,
const TAttachTransactionOptions& options)
- : TClientBase(context, transactionId, parentClient->GetRetryPolicy())
+ : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
new TPingableTransaction(
@@ -928,10 +942,11 @@ TClientPtr TTransaction::GetParentClientImpl()
////////////////////////////////////////////////////////////////////////////////
TClient::TClient(
+ IRawClientPtr rawClient,
const TClientContext& context,
const TTransactionId& globalId,
IClientRetryPolicyPtr retryPolicy)
- : TClientBase(context, globalId, retryPolicy)
+ : TClientBase(std::move(rawClient), context, globalId, retryPolicy)
, TransactionPinger_(nullptr)
{ }
@@ -943,7 +958,7 @@ ITransactionPtr TClient::AttachTransaction(
{
CheckShutdown();
- return MakeIntrusive<TTransaction>(this, Context_, transactionId, options);
+ return MakeIntrusive<TTransaction>(RawClient_, this, Context_, transactionId, options);
}
void TClient::MountTable(
@@ -1435,9 +1450,15 @@ TClientPtr CreateClientImpl(
retryConfigProvider = CreateDefaultRetryConfigProvider();
}
+ auto rawClient = MakeIntrusive<THttpRawClient>(context);
+
EnsureInitialized();
- return new TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
+ return new TClient(
+ std::move(rawClient),
+ context,
+ globalTxId,
+ CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
}
////////////////////////////////////////////////////////////////////////////////