aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client
diff options
context:
space:
mode:
authorMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-12-13 10:22:13 +0000
committerGitHub <noreply@github.com>2024-12-13 10:22:13 +0000
commite73e490feb4e1f63d097697324aa48b643a62317 (patch)
treef63fe3d15819a5148ade51609c5211251d93e425 /yt/cpp/mapreduce/client
parent19346460a8060a0ed4731edb192745642ff34b3d (diff)
parent4dde77404d1eae4a633d1cc3807142409a9938eb (diff)
downloadydb-e73e490feb4e1f63d097697324aa48b643a62317.tar.gz
Merge pull request #12582 from vitalyisaev2/rightlib_20241212
Merge rightlib 20241212
Diffstat (limited to 'yt/cpp/mapreduce/client')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp37
-rw-r--r--yt/cpp/mapreduce/client/client.h9
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp5
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.h1
4 files changed, 43 insertions, 9 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 43e3864ae2..a2d302212f 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -42,6 +42,7 @@
#include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
+#include <yt/cpp/mapreduce/raw_client/raw_client.h>
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
#include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h>
@@ -90,10 +91,12 @@ void ApplyProxyUrlAliasingRules(TString& url)
////////////////////////////////////////////////////////////////////////////////
TClientBase::TClientBase(
+ IRawClientPtr rawClient,
const TClientContext& context,
const TTransactionId& transactionId,
IClientRetryPolicyPtr retryPolicy)
- : Context_(context)
+ : RawClient_(std::move(rawClient))
+ , Context_(context)
, TransactionId_(transactionId)
, ClientRetryPolicy_(std::move(retryPolicy))
{ }
@@ -101,7 +104,7 @@ TClientBase::TClientBase(
ITransactionPtr TClientBase::StartTransaction(
const TStartTransactionOptions& options)
{
- return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options);
+ return MakeIntrusive<TTransaction>(RawClient_, GetParentClientImpl(), Context_, TransactionId_, options);
}
TNodeId TClientBase::Create(
@@ -138,7 +141,11 @@ void TClientBase::Set(
const TNode& value,
const TSetOptions& options)
{
- NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &value, &options] (TMutationId& mutationId) {
+ RawClient_->Set(mutationId, TransactionId_, path, value, options);
+ });
}
void TClientBase::MultisetAttributes(
@@ -826,6 +833,11 @@ IClientPtr TClientBase::GetParentClient()
return GetParentClientImpl();
}
+IRawClientPtr TClientBase::GetRawClient() const
+{
+ return RawClient_;
+}
+
const TClientContext& TClientBase::GetContext() const
{
return Context_;
@@ -839,11 +851,12 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
////////////////////////////////////////////////////////////////////////////////
TTransaction::TTransaction(
+ IRawClientPtr rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& parentTransactionId,
const TStartTransactionOptions& options)
- : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy())
+ : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
MakeHolder<TPingableTransaction>(
@@ -858,11 +871,12 @@ TTransaction::TTransaction(
}
TTransaction::TTransaction(
+ IRawClientPtr rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& transactionId,
const TAttachTransactionOptions& options)
- : TClientBase(context, transactionId, parentClient->GetRetryPolicy())
+ : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
new TPingableTransaction(
@@ -928,10 +942,11 @@ TClientPtr TTransaction::GetParentClientImpl()
////////////////////////////////////////////////////////////////////////////////
TClient::TClient(
+ IRawClientPtr rawClient,
const TClientContext& context,
const TTransactionId& globalId,
IClientRetryPolicyPtr retryPolicy)
- : TClientBase(context, globalId, retryPolicy)
+ : TClientBase(std::move(rawClient), context, globalId, retryPolicy)
, TransactionPinger_(nullptr)
{ }
@@ -943,7 +958,7 @@ ITransactionPtr TClient::AttachTransaction(
{
CheckShutdown();
- return MakeIntrusive<TTransaction>(this, Context_, transactionId, options);
+ return MakeIntrusive<TTransaction>(RawClient_, this, Context_, transactionId, options);
}
void TClient::MountTable(
@@ -1435,9 +1450,15 @@ TClientPtr CreateClientImpl(
retryConfigProvider = CreateDefaultRetryConfigProvider();
}
+ auto rawClient = MakeIntrusive<THttpRawClient>(context);
+
EnsureInitialized();
- return new TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
+ return new TClient(
+ std::move(rawClient),
+ context,
+ globalTxId,
+ CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h
index 32c458316d..2a0398d056 100644
--- a/yt/cpp/mapreduce/client/client.h
+++ b/yt/cpp/mapreduce/client/client.h
@@ -5,6 +5,7 @@
#include "transaction_pinger.h"
#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
@@ -29,6 +30,7 @@ class TClientBase
{
public:
TClientBase(
+ IRawClientPtr rawClient,
const TClientContext& context,
const TTransactionId& transactionId,
IClientRetryPolicyPtr retryPolicy);
@@ -222,6 +224,8 @@ public:
IClientPtr GetParentClient() override;
+ IRawClientPtr GetRawClient() const;
+
const TClientContext& GetContext() const;
const IClientRetryPolicyPtr& GetRetryPolicy() const;
@@ -232,6 +236,7 @@ protected:
virtual TClientPtr GetParentClientImpl() = 0;
protected:
+ const IRawClientPtr RawClient_;
const TClientContext Context_;
TTransactionId TransactionId_;
IClientRetryPolicyPtr ClientRetryPolicy_;
@@ -287,6 +292,7 @@ public:
//
// Start a new transaction.
TTransaction(
+ IRawClientPtr rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& parentTransactionId,
@@ -295,6 +301,7 @@ public:
//
// Attach an existing transaction.
TTransaction(
+ IRawClientPtr rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& transactionId,
@@ -325,6 +332,7 @@ protected:
TClientPtr GetParentClientImpl() override;
private:
+ const IRawClientPtr RawClient_;
ITransactionPingerPtr TransactionPinger_;
THolder<TPingableTransaction> PingableTx_;
TClientPtr ParentClient_;
@@ -338,6 +346,7 @@ class TClient
{
public:
TClient(
+ IRawClientPtr rawClient,
const TClientContext& context,
const TTransactionId& globalId,
IClientRetryPolicyPtr retryPolicy);
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index 07d00e88d6..a6d424c5a1 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -17,6 +17,7 @@
#include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
#include <yt/cpp/mapreduce/interface/error_codes.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
@@ -395,7 +396,8 @@ TJobPreparer::TJobPreparer(
size_t outputTableCount,
const TVector<TSmallJobFile>& smallFileList,
const TOperationOptions& options)
- : OperationPreparer_(operationPreparer)
+ : RawClient_(operationPreparer.GetClient()->GetRawClient())
+ , OperationPreparer_(operationPreparer)
, Spec_(spec)
, Options_(options)
, Layers_(spec.Layers_)
@@ -631,6 +633,7 @@ TMaybe<TString> TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& it
CreateFileInCypress(cypressPath);
auto uploadTx = MakeIntrusive<TTransaction>(
+ OperationPreparer_.GetClient()->GetRawClient(),
OperationPreparer_.GetClient(),
OperationPreparer_.GetContext(),
TTransactionId(),
diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h
index 54c978c0fb..41594eb52e 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.h
+++ b/yt/cpp/mapreduce/client/operation_preparer.h
@@ -83,6 +83,7 @@ public:
bool ShouldRedirectStdoutToStderr() const;
private:
+ const IRawClientPtr RawClient_;
TOperationPreparer& OperationPreparer_;
TUserJobSpec Spec_;
TOperationOptions Options_;