summaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp60
-rw-r--r--yt/cpp/mapreduce/client/client.h4
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp15
-rw-r--r--yt/cpp/mapreduce/client/client_reader.h4
-rw-r--r--yt/cpp/mapreduce/client/client_writer.cpp10
-rw-r--r--yt/cpp/mapreduce/client/client_writer.h1
-rw-r--r--yt/cpp/mapreduce/client/file_reader.cpp14
-rw-r--r--yt/cpp/mapreduce/client/file_reader.h4
-rw-r--r--yt/cpp/mapreduce/client/file_writer.cpp9
-rw-r--r--yt/cpp/mapreduce/client/file_writer.h1
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp31
-rw-r--r--yt/cpp/mapreduce/client/operation.h1
-rw-r--r--yt/cpp/mapreduce/client/operation_helpers.cpp41
-rw-r--r--yt/cpp/mapreduce/client/operation_helpers.h5
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp33
-rw-r--r--yt/cpp/mapreduce/client/prepare_operation.cpp19
-rw-r--r--yt/cpp/mapreduce/client/prepare_operation.h5
-rw-r--r--yt/cpp/mapreduce/client/retry_heavy_write_request.cpp4
-rw-r--r--yt/cpp/mapreduce/client/retry_heavy_write_request.h5
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer.cpp4
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer.h10
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer_v2.cpp3
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer_v2.h1
-rw-r--r--yt/cpp/mapreduce/client/structured_table_formats.cpp36
-rw-r--r--yt/cpp/mapreduce/client/structured_table_formats.h6
-rw-r--r--yt/cpp/mapreduce/client/transaction.cpp41
-rw-r--r--yt/cpp/mapreduce/client/transaction.h5
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h25
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp53
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h25
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp57
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h29
32 files changed, 376 insertions, 185 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index a2d302212f1..24aed7fb77f 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -126,14 +126,22 @@ bool TClientBase::Exists(
const TYPath& path,
const TExistsOptions& options)
{
- return NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
+ return RequestWithRetry<bool>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &options] (TMutationId& mutationId) {
+ return RawClient_->Exists(mutationId, TransactionId_, path, options);
+ });
}
TNode TClientBase::Get(
const TYPath& path,
const TGetOptions& options)
{
- return NRawClient::Get(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
+ return RequestWithRetry<TNode>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &options] (TMutationId& mutationId) {
+ return RawClient_->Get(mutationId, TransactionId_, path, options);
+ });
}
void TClientBase::Set(
@@ -149,12 +157,17 @@ void TClientBase::Set(
}
void TClientBase::MultisetAttributes(
- const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options)
+ const TYPath& path,
+ const TNode::TMapType& value,
+ const TMultisetAttributesOptions& options)
{
- NRawClient::MultisetAttributes(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &value, &options] (TMutationId& mutationId) {
+ RawClient_->MultisetAttributes(mutationId, TransactionId_, path, value, options);
+ });
}
-
TNode::TListType TClientBase::List(
const TYPath& path,
const TListOptions& options)
@@ -290,6 +303,7 @@ IFileReaderPtr TClientBase::CreateBlobTableReader(
return new TBlobTableReader(
path,
key,
+ RawClient_,
ClientRetryPolicy_,
GetTransactionPinger(),
Context_,
@@ -303,6 +317,7 @@ IFileReaderPtr TClientBase::CreateFileReader(
{
return new TFileReader(
CanonizeYPath(path),
+ RawClient_,
ClientRetryPolicy_,
GetTransactionPinger(),
Context_,
@@ -315,11 +330,18 @@ IFileWriterPtr TClientBase::CreateFileWriter(
const TFileWriterOptions& options)
{
auto realPath = CanonizeYPath(path);
- if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
+
+ auto exists = RequestWithRetry<bool>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &realPath] (TMutationId& mutationId) {
+ return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_);
+ });
+ if (!exists) {
NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE,
TCreateOptions().IgnoreExisting(true));
}
- return new TFileWriter(realPath, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
+
+ return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
}
TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
@@ -343,6 +365,7 @@ TRawTableWriterPtr TClientBase::CreateRawWriter(
const TTableWriterOptions& options)
{
return ::MakeIntrusive<TRetryfulWriter>(
+ RawClient_,
ClientRetryPolicy_,
GetTransactionPinger(),
Context_,
@@ -673,7 +696,7 @@ void TClientBase::CompleteOperation(const TOperationId& operationId)
void TClientBase::WaitForOperation(const TOperationId& operationId)
{
- NYT::NDetail::WaitForOperation(ClientRetryPolicy_, Context_, operationId);
+ NYT::NDetail::WaitForOperation(ClientRetryPolicy_, RawClient_, Context_, operationId);
}
void TClientBase::AlterTable(
@@ -691,6 +714,7 @@ void TClientBase::AlterTable(
{
return ::MakeIntrusive<TClientReader>(
CanonizeYPath(path),
+ RawClient_,
ClientRetryPolicy_,
GetTransactionPinger(),
Context_,
@@ -706,12 +730,20 @@ THolder<TClientWriter> TClientBase::CreateClientWriter(
const TTableWriterOptions& options)
{
auto realPath = CanonizeYPath(path);
- if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
+
+ auto exists = RequestWithRetry<bool>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &realPath] (TMutationId& mutationId) {
+ return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_);
+ });
+ if (!exists) {
NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE,
TCreateOptions().IgnoreExisting(true));
}
+
return MakeHolder<TClientWriter>(
realPath,
+ RawClient_,
ClientRetryPolicy_,
GetTransactionPinger(),
Context_,
@@ -851,15 +883,16 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
////////////////////////////////////////////////////////////////////////////////
TTransaction::TTransaction(
- IRawClientPtr rawClient,
+ const IRawClientPtr& rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& parentTransactionId,
const TStartTransactionOptions& options)
- : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy())
+ : TClientBase(rawClient, context, parentTransactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
MakeHolder<TPingableTransaction>(
+ rawClient,
parentClient->GetRetryPolicy(),
context,
parentTransactionId,
@@ -871,15 +904,16 @@ TTransaction::TTransaction(
}
TTransaction::TTransaction(
- IRawClientPtr rawClient,
+ const IRawClientPtr& rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& transactionId,
const TAttachTransactionOptions& options)
- : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy())
+ : TClientBase(rawClient, context, transactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
new TPingableTransaction(
+ rawClient,
parentClient->GetRetryPolicy(),
context,
transactionId,
diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h
index 8a29997303b..a3ba768975c 100644
--- a/yt/cpp/mapreduce/client/client.h
+++ b/yt/cpp/mapreduce/client/client.h
@@ -292,7 +292,7 @@ public:
//
// Start a new transaction.
TTransaction(
- IRawClientPtr rawClient,
+ const IRawClientPtr& rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& parentTransactionId,
@@ -301,7 +301,7 @@ public:
//
// Attach an existing transaction.
TTransaction(
- IRawClientPtr rawClient,
+ const IRawClientPtr& rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& transactionId,
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index 0efa06ed3af..07f1ef0ad6b 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -8,6 +8,10 @@
#include <yt/cpp/mapreduce/common/retry_lib.h>
#include <yt/cpp/mapreduce/common/wait_proxy.h>
+#include <yt/cpp/mapreduce/http/helpers.h>
+#include <yt/cpp/mapreduce/http/requests.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/tvm.h>
@@ -16,10 +20,7 @@
#include <yt/cpp/mapreduce/io/helpers.h>
#include <yt/cpp/mapreduce/io/yamr_table_reader.h>
-#include <yt/cpp/mapreduce/http/helpers.h>
-#include <yt/cpp/mapreduce/http/requests.h>
-#include <yt/cpp/mapreduce/http/retry_request.h>
-
+#include <yt/cpp/mapreduce/raw_client/raw_client.h>
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
#include <library/cpp/yson/node/serialize.h>
@@ -38,6 +39,7 @@ using ::ToString;
TClientReader::TClientReader(
const TRichYPath& path,
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
@@ -46,6 +48,7 @@ TClientReader::TClientReader(
const TTableReaderOptions& options,
bool useFormatFromTableAttributes)
: Path_(path)
+ , RawClient_(rawClient)
, ClientRetryPolicy_(std::move(clientRetryPolicy))
, Context_(context)
, ParentTransactionId_(transactionId)
@@ -56,12 +59,14 @@ TClientReader::TClientReader(
if (options.CreateTransaction_) {
Y_ABORT_UNLESS(transactionPinger, "Internal error: transactionPinger is null");
ReadTransaction_ = MakeHolder<TPingableTransaction>(
+ RawClient_,
ClientRetryPolicy_,
Context_,
transactionId,
transactionPinger->GetChildTxPinger(),
TStartTransactionOptions());
Path_.Path(Snapshot(
+ RawClient_,
ClientRetryPolicy_,
Context_,
ReadTransaction_->GetId(),
@@ -70,7 +75,7 @@ TClientReader::TClientReader(
if (useFormatFromTableAttributes) {
auto transactionId2 = ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_;
- auto newFormat = GetTableFormat(ClientRetryPolicy_, Context_, transactionId2, Path_);
+ auto newFormat = GetTableFormat(ClientRetryPolicy_, RawClient_, transactionId2, Path_);
if (newFormat) {
Format_->Config = *newFormat;
}
diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h
index 782edb77b7a..61bc6983405 100644
--- a/yt/cpp/mapreduce/client/client_reader.h
+++ b/yt/cpp/mapreduce/client/client_reader.h
@@ -21,6 +21,7 @@ class TClientReader
public:
TClientReader(
const TRichYPath& path,
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
@@ -43,8 +44,11 @@ protected:
private:
TRichYPath Path_;
+
+ const IRawClientPtr RawClient_;
const IClientRetryPolicyPtr ClientRetryPolicy_;
const TClientContext Context_;
+
TTransactionId ParentTransactionId_;
TMaybe<TFormat> Format_;
TTableReaderOptions Options_;
diff --git a/yt/cpp/mapreduce/client/client_writer.cpp b/yt/cpp/mapreduce/client/client_writer.cpp
index ee14ee0eb2c..84e0802ee23 100644
--- a/yt/cpp/mapreduce/client/client_writer.cpp
+++ b/yt/cpp/mapreduce/client/client_writer.cpp
@@ -1,19 +1,23 @@
#include "client_writer.h"
#include "retryful_writer.h"
-#include "retryless_writer.h"
#include "retryful_writer_v2.h"
+#include "retryless_writer.h"
-#include <yt/cpp/mapreduce/interface/io.h>
#include <yt/cpp/mapreduce/common/fwd.h>
#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <yt/cpp/mapreduce/raw_client/raw_client.h>
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
TClientWriter::TClientWriter(
const TRichYPath& path,
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
@@ -38,6 +42,7 @@ TClientWriter::TClientWriter(
auto serializedWriterOptions = FormIORequestParameters(options);
RawWriter_ = MakeIntrusive<NPrivate::TRetryfulWriterV2>(
+ rawClient,
std::move(clientRetryPolicy),
std::move(transactionPinger),
context,
@@ -50,6 +55,7 @@ TClientWriter::TClientWriter(
options.CreateTransaction_);
} else {
RawWriter_.Reset(new TRetryfulWriter(
+ rawClient,
std::move(clientRetryPolicy),
std::move(transactionPinger),
context,
diff --git a/yt/cpp/mapreduce/client/client_writer.h b/yt/cpp/mapreduce/client/client_writer.h
index 19a49778172..b3aec26a59c 100644
--- a/yt/cpp/mapreduce/client/client_writer.h
+++ b/yt/cpp/mapreduce/client/client_writer.h
@@ -17,6 +17,7 @@ class TClientWriter
public:
TClientWriter(
const TRichYPath& path,
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp
index 57131e5330b..73d5de21f11 100644
--- a/yt/cpp/mapreduce/client/file_reader.cpp
+++ b/yt/cpp/mapreduce/client/file_reader.cpp
@@ -19,6 +19,7 @@
#include <yt/cpp/mapreduce/http/http_client.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
+#include <yt/cpp/mapreduce/raw_client/raw_client.h>
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
namespace NYT {
@@ -39,13 +40,16 @@ static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) {
////////////////////////////////////////////////////////////////////////////////
TStreamReaderBase::TStreamReaderBase(
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
const TTransactionId& transactionId)
- : Context_(context)
+ : RawClient_(rawClient)
+ , Context_(context)
, ClientRetryPolicy_(std::move(clientRetryPolicy))
, ReadTransaction_(MakeHolder<TPingableTransaction>(
+ RawClient_,
ClientRetryPolicy_,
context,
transactionId,
@@ -57,7 +61,7 @@ TStreamReaderBase::~TStreamReaderBase() = default;
TYPath TStreamReaderBase::Snapshot(const TYPath& path)
{
- return NYT::Snapshot(ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path);
+ return NYT::Snapshot(RawClient_, ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path);
}
TString TStreamReaderBase::GetActiveRequestId() const
@@ -119,12 +123,13 @@ size_t TStreamReaderBase::DoRead(void* buf, size_t len)
TFileReader::TFileReader(
const TRichYPath& path,
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
const TTransactionId& transactionId,
const TFileReaderOptions& options)
- : TStreamReaderBase(std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
+ : TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
, FileReaderOptions_(options)
, Path_(path)
, StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0))
@@ -183,12 +188,13 @@ NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context
TBlobTableReader::TBlobTableReader(
const TYPath& path,
const TKey& key,
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr retryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
const TTransactionId& transactionId,
const TBlobTableReaderOptions& options)
- : TStreamReaderBase(std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
+ : TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
, Key_(key)
, Options_(options)
{
diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h
index d850008a312..48248696d34 100644
--- a/yt/cpp/mapreduce/client/file_reader.h
+++ b/yt/cpp/mapreduce/client/file_reader.h
@@ -22,6 +22,7 @@ class TStreamReaderBase
{
public:
TStreamReaderBase(
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
@@ -33,6 +34,7 @@ protected:
TYPath Snapshot(const TYPath& path);
protected:
+ const IRawClientPtr RawClient_;
const TClientContext Context_;
private:
@@ -60,6 +62,7 @@ class TFileReader
public:
TFileReader(
const TRichYPath& path,
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
@@ -86,6 +89,7 @@ public:
TBlobTableReader(
const TYPath& path,
const TKey& key,
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/client/file_writer.cpp b/yt/cpp/mapreduce/client/file_writer.cpp
index d273731e1d6..fcbb02bec92 100644
--- a/yt/cpp/mapreduce/client/file_writer.cpp
+++ b/yt/cpp/mapreduce/client/file_writer.cpp
@@ -1,9 +1,12 @@
#include "file_writer.h"
-#include <yt/cpp/mapreduce/io/helpers.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
+
#include <yt/cpp/mapreduce/interface/finish_or_die.h>
-#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/io/helpers.h>
+
+#include <yt/cpp/mapreduce/raw_client/raw_client.h>
namespace NYT {
@@ -11,6 +14,7 @@ namespace NYT {
TFileWriter::TFileWriter(
const TRichYPath& path,
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
@@ -18,6 +22,7 @@ TFileWriter::TFileWriter(
const TFileWriterOptions& options)
: AutoFinish_(options.AutoFinish_)
, RetryfulWriter_(
+ rawClient,
std::move(clientRetryPolicy),
std::move(transactionPinger),
context,
diff --git a/yt/cpp/mapreduce/client/file_writer.h b/yt/cpp/mapreduce/client/file_writer.h
index ae143a75bee..67246bfe25c 100644
--- a/yt/cpp/mapreduce/client/file_writer.h
+++ b/yt/cpp/mapreduce/client/file_writer.h
@@ -16,6 +16,7 @@ class TFileWriter
public:
TFileWriter(
const TRichYPath& path,
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index 005bb507fa5..5bb67e3059f 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -319,6 +319,7 @@ TSimpleOperationIo CreateSimpleOperationIo(
TOperationPreparationContext(
inputs,
outputs,
+ preparer.GetClient()->GetRawClient(),
preparer.GetContext(),
preparer.GetClientRetryPolicy(),
preparer.GetTransactionId()),
@@ -491,6 +492,7 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
TOperationPreparationContext(
structuredInputs,
structuredOutputs,
+ preparer.GetClient()->GetRawClient(),
preparer.GetContext(),
preparer.GetClientRetryPolicy(),
preparer.GetTransactionId()),
@@ -499,7 +501,12 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
hints);
TVector<TSmallJobFile> formatConfigList;
- TFormatBuilder formatBuilder(preparer.GetClientRetryPolicy(), preparer.GetContext(), preparer.GetTransactionId(), options);
+ TFormatBuilder formatBuilder(
+ preparer.GetClient()->GetRawClient(),
+ preparer.GetClientRetryPolicy(),
+ preparer.GetContext(),
+ preparer.GetTransactionId(),
+ options);
auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat(
structuredJob,
@@ -587,11 +594,12 @@ EOperationBriefState CheckOperation(
void WaitForOperation(
const IClientRetryPolicyPtr& clientRetryPolicy,
+ const IRawClientPtr& rawClient,
const TClientContext& context,
const TOperationId& operationId)
{
const TDuration checkOperationStateInterval =
- UseLocalModeOptimization(context, clientRetryPolicy)
+ UseLocalModeOptimization(rawClient, context, clientRetryPolicy)
? Min(TDuration::MilliSeconds(100), context.Config->OperationTrackerPollPeriod)
: context.Config->OperationTrackerPollPeriod;
@@ -1020,13 +1028,16 @@ void CheckInputTablesExist(
Y_ENSURE(!paths.empty(), "Input tables are not set");
for (auto& path : paths) {
auto curTransactionId = path.TransactionId_.GetOrElse(preparer.GetTransactionId());
+ auto exists = RequestWithRetry<bool>(
+ preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
+ [&preparer, &curTransactionId, &path] (TMutationId& mutationId) {
+ return preparer.GetClient()->GetRawClient()->Exists(
+ mutationId,
+ curTransactionId,
+ path.Path_);
+ });
Y_ENSURE_EX(
- path.Cluster_.Defined() ||
- Exists(
- preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- preparer.GetContext(),
- curTransactionId,
- path.Path_),
+ path.Cluster_.Defined() || exists,
TApiUsageError() << "Input table '" << path.Path_ << "' doesn't exist");
}
}
@@ -1633,6 +1644,7 @@ void ExecuteMapReduce(
VerifyHasElements(structuredInputs, "inputs");
TFormatBuilder formatBuilder(
+ preparer->GetClient()->GetRawClient(),
preparer->GetClientRetryPolicy(),
preparer->GetContext(),
preparer->GetTransactionId(),
@@ -1657,6 +1669,7 @@ void ExecuteMapReduce(
TOperationPreparationContext(
structuredInputs,
mapperOutput,
+ preparer->GetClient()->GetRawClient(),
preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
@@ -1726,6 +1739,7 @@ void ExecuteMapReduce(
TOperationPreparationContext(
inputs,
outputs,
+ preparer->GetClient()->GetRawClient(),
preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
@@ -1791,6 +1805,7 @@ void ExecuteMapReduce(
TOperationPreparationContext(
structuredInputs,
structuredOutputs,
+ preparer->GetClient()->GetRawClient(),
preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
diff --git a/yt/cpp/mapreduce/client/operation.h b/yt/cpp/mapreduce/client/operation.h
index 141161b0b72..f866c739361 100644
--- a/yt/cpp/mapreduce/client/operation.h
+++ b/yt/cpp/mapreduce/client/operation.h
@@ -185,6 +185,7 @@ EOperationBriefState CheckOperation(
void WaitForOperation(
const IClientRetryPolicyPtr& clientRetryPolicy,
+ const IRawClientPtr& rawClient,
const TClientContext& context,
const TOperationId& operationId);
diff --git a/yt/cpp/mapreduce/client/operation_helpers.cpp b/yt/cpp/mapreduce/client/operation_helpers.cpp
index abb21856622..f307efaffea 100644
--- a/yt/cpp/mapreduce/client/operation_helpers.cpp
+++ b/yt/cpp/mapreduce/client/operation_helpers.cpp
@@ -2,15 +2,17 @@
#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/http/requests.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
-#include <yt/cpp/mapreduce/http/context.h>
-#include <yt/cpp/mapreduce/http/requests.h>
-
#include <util/string/builder.h>
#include <util/system/mutex.h>
@@ -26,7 +28,10 @@ ui64 RoundUpFileSize(ui64 size)
return (size + roundUpTo - 1) & ~(roundUpTo - 1);
}
-bool UseLocalModeOptimization(const TClientContext& context, const IClientRetryPolicyPtr& clientRetryPolicy)
+bool UseLocalModeOptimization(
+ const IRawClientPtr& rawClient,
+ const TClientContext& context,
+ const IClientRetryPolicyPtr& clientRetryPolicy)
{
if (!context.Config->EnableLocalModeOptimization) {
return false;
@@ -47,18 +52,26 @@ bool UseLocalModeOptimization(const TClientContext& context, const IClientRetryP
TString localModeAttr("//sys/@local_mode_fqdn");
// We don't want to pollute logs with errors about failed request,
// so we check if path exists before getting it.
- if (NRawClient::Exists(clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- TTransactionId(),
- localModeAttr,
- TExistsOptions().ReadFrom(EMasterReadKind::Cache)))
+ auto exists = RequestWithRetry<bool>(
+ clientRetryPolicy->CreatePolicyForGenericRequest(),
+ [&rawClient, &localModeAttr] (TMutationId& mutationId) {
+ return rawClient->Exists(
+ mutationId,
+ TTransactionId(),
+ localModeAttr,
+ TExistsOptions().ReadFrom(EMasterReadKind::Cache));
+ });
+ if (exists)
{
- auto fqdnNode = NRawClient::TryGet(
+ auto fqdnNode = RequestWithRetry<TNode>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- TTransactionId(),
- localModeAttr,
- TGetOptions().ReadFrom(EMasterReadKind::Cache));
+ [&rawClient, &localModeAttr] (TMutationId& mutationId) {
+ return rawClient->TryGet(
+ mutationId,
+ TTransactionId(),
+ localModeAttr,
+ TGetOptions().ReadFrom(EMasterReadKind::Cache));
+ });
if (!fqdnNode.IsUndefined()) {
auto fqdn = fqdnNode.AsString();
isLocalMode = (fqdn == TProcessState::Get()->FqdnHostName);
diff --git a/yt/cpp/mapreduce/client/operation_helpers.h b/yt/cpp/mapreduce/client/operation_helpers.h
index 7fd2ffb0de7..f7d25300b1a 100644
--- a/yt/cpp/mapreduce/client/operation_helpers.h
+++ b/yt/cpp/mapreduce/client/operation_helpers.h
@@ -11,7 +11,10 @@ namespace NYT::NDetail {
ui64 RoundUpFileSize(ui64 size);
-bool UseLocalModeOptimization(const TClientContext& context, const IClientRetryPolicyPtr& clientRetryPolicy);
+bool UseLocalModeOptimization(
+ const IRawClientPtr& rawClient,
+ const TClientContext& context,
+ const IClientRetryPolicyPtr& clientRetryPolicy);
TString GetOperationWebInterfaceUrl(TStringBuf serverName, TOperationId operationId);
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index a6d424c5a1d..eb7264425cf 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -142,6 +142,7 @@ TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transac
: Client_(std::move(client))
, TransactionId_(transactionId)
, FileTransaction_(MakeHolder<TPingableTransaction>(
+ Client_->GetRawClient(),
Client_->GetRetryPolicy(),
Client_->GetContext(),
TransactionId_,
@@ -609,6 +610,7 @@ TString TJobPreparer::UploadToRandomPath(const IItemToUpload& itemToUpload) cons
{
TFileWriter writer(
uniquePath,
+ OperationPreparer_.GetClient()->GetRawClient(),
OperationPreparer_.GetClientRetryPolicy(),
OperationPreparer_.GetClient()->GetTransactionPinger(),
OperationPreparer_.GetContext(),
@@ -746,23 +748,29 @@ TString TJobPreparer::UploadToCache(const IItemToUpload& itemToUpload) const
void TJobPreparer::UseFileInCypress(const TRichYPath& file)
{
- if (!Exists(
+ auto exists = RequestWithRetry<bool>(
OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- OperationPreparer_.GetContext(),
- file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
- file.Path_))
+ [this, &file] (TMutationId& mutationId) {
+ return RawClient_->Exists(
+ mutationId,
+ file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
+ file.Path_);
+ });
+ if (!exists)
{
ythrow yexception() << "File " << file.Path_ << " does not exist";
}
if (ShouldMountSandbox()) {
- auto size = Get(
+ auto size = RequestWithRetry<i64>(
OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- OperationPreparer_.GetContext(),
- file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
- file.Path_ + "/@uncompressed_data_size")
- .AsInt64();
-
+ [this, &file] (TMutationId& mutationId) {
+ return RawClient_->Get(
+ mutationId,
+ file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
+ file.Path_ + "/@uncompressed_data_size")
+ .AsInt64();
+ });
TotalFileSize_ += RoundUpFileSize(static_cast<ui64>(size));
}
CypressFiles_.push_back(file);
@@ -835,7 +843,10 @@ void TJobPreparer::UploadSmallFile(const TSmallJobFile& smallFile)
bool TJobPreparer::IsLocalMode() const
{
- return UseLocalModeOptimization(OperationPreparer_.GetContext(), OperationPreparer_.GetClientRetryPolicy());
+ return UseLocalModeOptimization(
+ OperationPreparer_.GetClient()->GetRawClient(),
+ OperationPreparer_.GetContext(),
+ OperationPreparer_.GetClientRetryPolicy());
}
void TJobPreparer::PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState)
diff --git a/yt/cpp/mapreduce/client/prepare_operation.cpp b/yt/cpp/mapreduce/client/prepare_operation.cpp
index cd775f53577..f78d15f3e7e 100644
--- a/yt/cpp/mapreduce/client/prepare_operation.cpp
+++ b/yt/cpp/mapreduce/client/prepare_operation.cpp
@@ -2,6 +2,9 @@
#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
+#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/interface/serialize.h>
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
@@ -16,10 +19,12 @@ namespace NYT::NDetail {
TOperationPreparationContext::TOperationPreparationContext(
const TStructuredJobTableList& structuredInputs,
const TStructuredJobTableList& structuredOutputs,
+ const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId)
- : Context_(context)
+ : RawClient_(rawClient)
+ , Context_(context)
, RetryPolicy_(retryPolicy)
, TransactionId_(transactionId)
, InputSchemas_(structuredInputs.size())
@@ -38,10 +43,12 @@ TOperationPreparationContext::TOperationPreparationContext(
TOperationPreparationContext::TOperationPreparationContext(
TVector<TRichYPath> inputs,
TVector<TRichYPath> outputs,
+ const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId)
- : Context_(context)
+ : RawClient_(rawClient)
+ , Context_(context)
, RetryPolicy_(retryPolicy)
, TransactionId_(transactionId)
, InputSchemas_(inputs.size())
@@ -99,11 +106,11 @@ const TTableSchema& TOperationPreparationContext::GetInputSchema(int index) cons
auto& schema = InputSchemas_[index];
if (!InputSchemasLoaded_[index]) {
Y_ABORT_UNLESS(Inputs_[index]);
- auto schemaNode = NRawClient::Get(
+ auto schemaNode = RequestWithRetry<TNode>(
RetryPolicy_->CreatePolicyForGenericRequest(),
- Context_,
- TransactionId_,
- Inputs_[index]->Path_ + "/@schema");
+ [this, &index] (TMutationId& mutationId) {
+ return RawClient_->Get(mutationId, TransactionId_, Inputs_[index]->Path_ + "/@schema");
+ });
Deserialize(schema, schemaNode);
}
return schema;
diff --git a/yt/cpp/mapreduce/client/prepare_operation.h b/yt/cpp/mapreduce/client/prepare_operation.h
index 3b64aa28565..3fde1d1678e 100644
--- a/yt/cpp/mapreduce/client/prepare_operation.h
+++ b/yt/cpp/mapreduce/client/prepare_operation.h
@@ -15,6 +15,7 @@ public:
TOperationPreparationContext(
const TStructuredJobTableList& structuredInputs,
const TStructuredJobTableList& structuredOutputs,
+ const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId);
@@ -22,6 +23,7 @@ public:
TOperationPreparationContext(
TVector<TRichYPath> inputs,
TVector<TRichYPath> outputs,
+ const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId);
@@ -38,8 +40,11 @@ public:
private:
TVector<TMaybe<TRichYPath>> Inputs_;
TVector<TMaybe<TRichYPath>> Outputs_;
+
+ const IRawClientPtr RawClient_;
const TClientContext& Context_;
const IClientRetryPolicyPtr RetryPolicy_;
+
TTransactionId TransactionId_;
mutable TVector<TTableSchema> InputSchemas_;
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
index 253d7c0d44f..44c7db3a971 100644
--- a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
+++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
@@ -25,6 +25,7 @@ using ::ToString;
////////////////////////////////////////////////////////////////////////////////
void RetryHeavyWriteRequest(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& clientRetryPolicy,
const ITransactionPingerPtr& transactionPinger,
const TClientContext& context,
@@ -44,7 +45,7 @@ void RetryHeavyWriteRequest(
}
for (int attempt = 0; attempt < retryCount; ++attempt) {
- TPingableTransaction attemptTx(clientRetryPolicy, context, parentId, transactionPinger->GetChildTxPinger(), TStartTransactionOptions());
+ TPingableTransaction attemptTx(rawClient, clientRetryPolicy, context, parentId, transactionPinger->GetChildTxPinger(), TStartTransactionOptions());
auto input = streamMaker();
TString requestId;
@@ -167,6 +168,7 @@ void THeavyRequestRetrier::TryStartAttempt()
{
Attempt_ = std::make_unique<TAttempt>();
Attempt_->Transaction = std::make_unique<TPingableTransaction>(
+ Parameters_.RawClientPtr,
Parameters_.ClientRetryPolicy, Parameters_.Context,
Parameters_.TransactionId,
Parameters_.TransactionPinger->GetChildTxPinger(),
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.h b/yt/cpp/mapreduce/client/retry_heavy_write_request.h
index b3a6ec0f7c2..6933170e96d 100644
--- a/yt/cpp/mapreduce/client/retry_heavy_write_request.h
+++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.h
@@ -1,11 +1,14 @@
#pragma once
#include <yt/cpp/mapreduce/client/transaction.h>
+
#include <yt/cpp/mapreduce/common/fwd.h>
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
+#include <yt/cpp/mapreduce/raw_client/raw_client.h>
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -15,6 +18,7 @@ class THeavyRequestRetrier
public:
struct TParameters
{
+ IRawClientPtr RawClientPtr;
IClientRetryPolicyPtr ClientRetryPolicy;
ITransactionPingerPtr TransactionPinger;
TClientContext Context;
@@ -55,6 +59,7 @@ private:
////////////////////////////////////////////////////////////////////////////////
void RetryHeavyWriteRequest(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& clientRetryPolicy,
const ITransactionPingerPtr& transactionPinger,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp
index 55165b17ffe..41ad1298cdd 100644
--- a/yt/cpp/mapreduce/client/retryful_writer.cpp
+++ b/yt/cpp/mapreduce/client/retryful_writer.cpp
@@ -9,6 +9,8 @@
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+#include <yt/cpp/mapreduce/raw_client/raw_client.h>
+
#include <util/generic/size_literals.h>
namespace NYT {
@@ -106,7 +108,7 @@ void TRetryfulWriter::Send(const TBuffer& buffer)
};
auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_);
- RetryHeavyWriteRequest(ClientRetryPolicy_, TransactionPinger_, Context_, transactionId, header, streamMaker);
+ RetryHeavyWriteRequest(RawClient_, ClientRetryPolicy_, TransactionPinger_, Context_, transactionId, header, streamMaker);
Parameters_ = SecondaryParameters_; // all blocks except the first one are appended
}
diff --git a/yt/cpp/mapreduce/client/retryful_writer.h b/yt/cpp/mapreduce/client/retryful_writer.h
index c2de332bffe..e49f9d5f1b3 100644
--- a/yt/cpp/mapreduce/client/retryful_writer.h
+++ b/yt/cpp/mapreduce/client/retryful_writer.h
@@ -8,6 +8,7 @@
#include <yt/cpp/mapreduce/interface/common.h>
#include <yt/cpp/mapreduce/interface/io.h>
#include <yt/cpp/mapreduce/io/helpers.h>
+
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
#include <library/cpp/threading/blocking_queue/blocking_queue.h>
@@ -28,6 +29,7 @@ class TRetryfulWriter
public:
template <class TWriterOptions>
TRetryfulWriter(
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
@@ -36,7 +38,8 @@ public:
const TMaybe<TFormat>& format,
const TRichYPath& path,
const TWriterOptions& options)
- : ClientRetryPolicy_(std::move(clientRetryPolicy))
+ : RawClient_(rawClient)
+ , ClientRetryPolicy_(std::move(clientRetryPolicy))
, TransactionPinger_(std::move(transactionPinger))
, Context_(context)
, AutoFinish_(options.AutoFinish_)
@@ -61,7 +64,7 @@ public:
SecondaryParameters_ = FormIORequestParameters(secondaryPath, options);
if (options.CreateTransaction_) {
- WriteTransaction_.ConstructInPlace(ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions());
+ WriteTransaction_.ConstructInPlace(rawClient, ClientRetryPolicy_, context, parentId, TransactionPinger_->GetChildTxPinger(), TStartTransactionOptions());
auto append = path.Append_.GetOrElse(false);
auto lockMode = (append ? LM_SHARED : LM_EXCLUSIVE);
NDetail::NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, WriteTransaction_->GetId(), path.Path_, lockMode);
@@ -89,12 +92,15 @@ private:
static size_t GetBufferSize(const TMaybe<TWriterOptions>& writerOptions);
private:
+ const IRawClientPtr RawClient_;
const IClientRetryPolicyPtr ClientRetryPolicy_;
const ITransactionPingerPtr TransactionPinger_;
const TClientContext Context_;
const bool AutoFinish_;
+
TString Command_;
TMaybe<TFormat> Format_;
+
const size_t BufferSize_;
TNode Parameters_;
diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
index 49f02e0405b..e574afe4401 100644
--- a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
+++ b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
@@ -291,6 +291,7 @@ struct TRetryfulWriterV2::TSendTask
////////////////////////////////////////////////////////////////////////////////
TRetryfulWriterV2::TRetryfulWriterV2(
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
@@ -311,6 +312,7 @@ TRetryfulWriterV2::TRetryfulWriterV2(
if (createTransaction) {
WriteTransaction_ = MakeHolder<TPingableTransaction>(
+ rawClient,
clientRetryPolicy,
context,
parentId,
@@ -329,6 +331,7 @@ TRetryfulWriterV2::TRetryfulWriterV2(
}
THeavyRequestRetrier::TParameters parameters = {
+ .RawClientPtr = rawClient,
.ClientRetryPolicy = clientRetryPolicy,
.TransactionPinger = transactionPinger,
.Context = context,
diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.h b/yt/cpp/mapreduce/client/retryful_writer_v2.h
index bda55d96a60..661ef5d0b58 100644
--- a/yt/cpp/mapreduce/client/retryful_writer_v2.h
+++ b/yt/cpp/mapreduce/client/retryful_writer_v2.h
@@ -18,6 +18,7 @@ class TRetryfulWriterV2
{
public:
TRetryfulWriterV2(
+ const IRawClientPtr& rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
ITransactionPingerPtr transactionPinger,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/client/structured_table_formats.cpp b/yt/cpp/mapreduce/client/structured_table_formats.cpp
index 6dba1e14359..a2f47af3aeb 100644
--- a/yt/cpp/mapreduce/client/structured_table_formats.cpp
+++ b/yt/cpp/mapreduce/client/structured_table_formats.cpp
@@ -5,19 +5,20 @@
#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
+#include <yt/cpp/mapreduce/interface/common.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
+
#include <yt/cpp/mapreduce/io/yamr_table_reader.h>
#include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
-#include <yt/cpp/mapreduce/interface/common.h>
-
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
#include <library/cpp/type_info/type_info.h>
#include <library/cpp/yson/writer.h>
-#include <memory>
-
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -66,15 +67,26 @@ TMaybe<TNode> GetCommonTableFormat(
TMaybe<TNode> GetTableFormat(
const IClientRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TTransactionId& transactionId,
const TRichYPath& path)
{
auto formatPath = path.Path_ + "/@_format";
- if (!NDetail::NRawClient::Exists(retryPolicy->CreatePolicyForGenericRequest(), context, transactionId, formatPath)) {
+
+ auto exists = NDetail::RequestWithRetry<bool>(
+ retryPolicy->CreatePolicyForGenericRequest(),
+ [&rawClient, &transactionId, &formatPath] (TMutationId& mutationId) {
+ return rawClient->Exists(mutationId, transactionId, formatPath);
+ });
+ if (!exists) {
return TMaybe<TNode>();
}
- TMaybe<TNode> format = NDetail::NRawClient::Get(retryPolicy->CreatePolicyForGenericRequest(), context, transactionId, formatPath);
+
+ auto format = NDetail::RequestWithRetry<TMaybe<TNode>>(
+ retryPolicy->CreatePolicyForGenericRequest(),
+ [&rawClient, &transactionId, &formatPath] (TMutationId& mutationId) {
+ return rawClient->Get(mutationId, transactionId, formatPath);
+ });
if (format.Get()->AsString() != "yamred_dsv") {
return TMaybe<TNode>();
}
@@ -90,13 +102,13 @@ TMaybe<TNode> GetTableFormat(
TMaybe<TNode> GetTableFormats(
const IClientRetryPolicyPtr& clientRetryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TTransactionId& transactionId,
const TVector<TRichYPath>& inputs)
{
TVector<TMaybe<TNode>> formats;
for (auto& table : inputs) {
- formats.push_back(GetTableFormat(clientRetryPolicy, context, transactionId, table));
+ formats.push_back(GetTableFormat(clientRetryPolicy, rawClient, transactionId, table));
}
return GetCommonTableFormat(formats);
@@ -310,11 +322,13 @@ struct TFormatBuilder::TFormatSwitcher
};
TFormatBuilder::TFormatBuilder(
+ IRawClientPtr rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
TClientContext context,
TTransactionId transactionId,
TOperationOptions operationOptions)
- : ClientRetryPolicy_(std::move(clientRetryPolicy))
+ : RawClient_(std::move(rawClient))
+ , ClientRetryPolicy_(std::move(clientRetryPolicy))
, Context_(std::move(context))
, TransactionId_(transactionId)
, OperationOptions_(std::move(operationOptions))
@@ -376,7 +390,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateYamrFormat(
Y_ABORT_UNLESS(table.RichYPath, "Cannot use format from table for intermediate table");
tableList.push_back(*table.RichYPath);
}
- formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, Context_, TransactionId_, tableList);
+ formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, RawClient_, TransactionId_, tableList);
}
if (formatFromTableAttributes) {
return {
diff --git a/yt/cpp/mapreduce/client/structured_table_formats.h b/yt/cpp/mapreduce/client/structured_table_formats.h
index 27d980c587a..162630d4110 100644
--- a/yt/cpp/mapreduce/client/structured_table_formats.h
+++ b/yt/cpp/mapreduce/client/structured_table_formats.h
@@ -21,13 +21,13 @@ TMaybe<TNode> GetCommonTableFormat(
TMaybe<TNode> GetTableFormat(
const IClientRetryPolicyPtr& clientRetryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TTransactionId& transactionId,
const TRichYPath& path);
TMaybe<TNode> GetTableFormats(
const IClientRetryPolicyPtr& clientRetryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TTransactionId& transactionId,
const TVector<TRichYPath>& paths);
@@ -84,6 +84,7 @@ private:
public:
TFormatBuilder(
+ IRawClientPtr rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
TClientContext context,
TTransactionId transactionId,
@@ -130,6 +131,7 @@ public:
bool allowFormatFromTableAttribute);
private:
+ const IRawClientPtr RawClient_;
const IClientRetryPolicyPtr ClientRetryPolicy_;
const TClientContext Context_;
const TTransactionId TransactionId_;
diff --git a/yt/cpp/mapreduce/client/transaction.cpp b/yt/cpp/mapreduce/client/transaction.cpp
index 0aa1a7a1c39..9daef9654da 100644
--- a/yt/cpp/mapreduce/client/transaction.cpp
+++ b/yt/cpp/mapreduce/client/transaction.cpp
@@ -2,15 +2,16 @@
#include "transaction_pinger.h"
-#include <yt/cpp/mapreduce/interface/config.h>
-#include <yt/cpp/mapreduce/interface/error_codes.h>
-
#include <yt/cpp/mapreduce/common/wait_proxy.h>
#include <yt/cpp/mapreduce/common/retry_lib.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/error_codes.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
+
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
#include <util/datetime/base.h>
@@ -26,12 +27,14 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
TPingableTransaction::TPingableTransaction(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& retryPolicy,
const TClientContext& context,
const TTransactionId& parentId,
ITransactionPingerPtr transactionPinger,
const TStartTransactionOptions& options)
- : ClientRetryPolicy_(retryPolicy)
+ : RawClient_(rawClient)
+ , ClientRetryPolicy_(retryPolicy)
, Context_(context)
, AbortableRegistry_(NDetail::TAbortableRegistry::Get())
, AbortOnTermination_(true)
@@ -49,24 +52,29 @@ TPingableTransaction::TPingableTransaction(
}
TPingableTransaction::TPingableTransaction(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& retryPolicy,
const TClientContext& context,
const TTransactionId& transactionId,
ITransactionPingerPtr transactionPinger,
const TAttachTransactionOptions& options)
- : ClientRetryPolicy_(retryPolicy)
+ : RawClient_(rawClient)
+ , ClientRetryPolicy_(retryPolicy)
, Context_(context)
, AbortableRegistry_(NDetail::TAbortableRegistry::Get())
, AbortOnTermination_(options.AbortOnTermination_)
, AutoPingable_(options.AutoPingable_)
, Pinger_(std::move(transactionPinger))
{
- auto timeoutNode = NDetail::NRawClient::TryGet(
+ auto timeoutNode = NDetail::RequestWithRetry<TNode>(
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- context,
- TTransactionId(),
- "#" + GetGuidAsString(transactionId) + "/@timeout",
- TGetOptions());
+ [this, &transactionId] (TMutationId& mutationId) {
+ return RawClient_->TryGet(
+ mutationId,
+ TTransactionId(),
+ "#" + GetGuidAsString(transactionId) + "/@timeout",
+ TGetOptions());
+ });
if (timeoutNode.IsUndefined()) {
throw yexception() << "Transaction " << GetGuidAsString(transactionId) << " does not exist";
}
@@ -171,6 +179,7 @@ void TPingableTransaction::Stop(EStopAction action)
////////////////////////////////////////////////////////////////////////////////
TYPath Snapshot(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TClientContext& context,
const TTransactionId& transactionId,
@@ -182,11 +191,15 @@ TYPath Snapshot(
transactionId,
path,
ELockMode::LM_SNAPSHOT);
- auto lockedNodeId = NDetail::NRawClient::Get(
+
+ auto lockedNodeId = NDetail::RequestWithRetry<TNode>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- transactionId,
- ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id");
+ [&rawClient, &transactionId, &lockId] (TMutationId& mutationId) {
+ return rawClient->Get(
+ mutationId,
+ transactionId,
+ ::TStringBuilder() << '#' << GetGuidAsString(lockId) << "/@node_id");
+ });
return "#" + lockedNodeId.AsString();
}
diff --git a/yt/cpp/mapreduce/client/transaction.h b/yt/cpp/mapreduce/client/transaction.h
index a363020b246..c3a95462990 100644
--- a/yt/cpp/mapreduce/client/transaction.h
+++ b/yt/cpp/mapreduce/client/transaction.h
@@ -20,6 +20,7 @@ public:
//
// Start a new transaction.
TPingableTransaction(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& retryPolicy,
const TClientContext& context,
const TTransactionId& parentId,
@@ -29,6 +30,7 @@ public:
//
// Attach to an existing transaction.
TPingableTransaction(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& retryPolicy,
const TClientContext& context,
const TTransactionId& transactionId,
@@ -56,6 +58,8 @@ private:
};
private:
+ const IRawClientPtr RawClient_;
+
IClientRetryPolicyPtr ClientRetryPolicy_;
TClientContext Context_;
TTransactionId TransactionId_;
@@ -83,6 +87,7 @@ private:
////////////////////////////////////////////////////////////////////////////////
TYPath Snapshot(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TClientContext& context,
const TTransactionId& transactionId,
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index b1d244ad78f..85b7d1aa54e 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -12,12 +12,37 @@ class IRawClient
public:
// Cypress
+ virtual TNode Get(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options = {}) = 0;
+
+ virtual TNode TryGet(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options = {}) = 0;
+
virtual void Set(
TMutationId& mutationId,
const TTransactionId& transactionId,
const TYPath& path,
const TNode& value,
const TSetOptions& options = {}) = 0;
+
+ virtual bool Exists(
+ TMutationId& mutataionId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TExistsOptions& options = {}) = 0;
+
+ virtual void MultisetAttributes(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode::TMapType& value,
+ const TMultisetAttributesOptions& options = {}) = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 9a8a9fca845..cc096823774 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -3,6 +3,7 @@
#include "rpc_parameters_serialization.h"
#include <yt/cpp/mapreduce/http/http.h>
+#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
#include <library/cpp/yson/node/node_io.h>
@@ -15,6 +16,33 @@ THttpRawClient::THttpRawClient(const TClientContext& context)
: Context_(context)
{ }
+TNode THttpRawClient::Get(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options)
+{
+ THttpHeader header("GET", "get");
+ header.MergeParameters(NRawClient::SerializeParamsForGet(transactionId, Context_.Config->Prefix, path, options));
+ return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header).Response);
+}
+
+TNode THttpRawClient::TryGet(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options)
+{
+ try {
+ return Get(mutationId, transactionId, path, options);
+ } catch (const TErrorResponse& error) {
+ if (!error.IsResolveError()) {
+ throw;
+ }
+ return TNode();
+ }
+}
+
void THttpRawClient::Set(
TMutationId& mutationId,
const TTransactionId& transactionId,
@@ -28,6 +56,31 @@ void THttpRawClient::Set(
RequestWithoutRetry(Context_, mutationId, header, body);
}
+bool THttpRawClient::Exists(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TExistsOptions& options)
+{
+ THttpHeader header("GET", "exists");
+ header.MergeParameters(NRawClient::SerializeParamsForExists(transactionId, Context_.Config->Prefix, path, options));
+ return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+}
+
+void THttpRawClient::MultisetAttributes(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode::TMapType& value,
+ const TMultisetAttributesOptions& options)
+{
+ THttpHeader header("PUT", "api/v4/multiset_attributes", false);
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForMultisetAttributes(transactionId, Context_.Config->Prefix, path, options));
+ auto body = NodeToYsonString(value);
+ RequestWithoutRetry(Context_, mutationId, header, body);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index def521a9188..3b31aa519c4 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -17,6 +17,18 @@ public:
// Cypress
+ TNode Get(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options = {}) override;
+
+ TNode TryGet(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TGetOptions& options) override;
+
void Set(
TMutationId& mutationId,
const TTransactionId& transactionId,
@@ -24,6 +36,19 @@ public:
const TNode& value,
const TSetOptions& options = {}) override;
+ bool Exists(
+ TMutationId& mutataionId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TExistsOptions& options = {}) override;
+
+ void MultisetAttributes(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode::TMapType& value,
+ const TMultisetAttributesOptions& options = {}) override;
+
private:
const TClientContext Context_;
};
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index d10c818489c..13100a29070 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -79,35 +79,6 @@ void ExecuteBatch(
}
}
-TNode Get(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TGetOptions& options)
-{
- THttpHeader header("GET", "get");
- header.MergeParameters(SerializeParamsForGet(transactionId, context.Config->Prefix, path, options));
- return NodeFromYsonString(RetryRequestWithPolicy(retryPolicy, context, header).Response);
-}
-
-TNode TryGet(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TGetOptions& options)
-{
- try {
- return Get(retryPolicy, context, transactionId, path, options);
- } catch (const TErrorResponse& error) {
- if (!error.IsResolveError()) {
- throw;
- }
- return TNode();
- }
-}
-
void Set(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
@@ -123,34 +94,6 @@ void Set(
RetryRequestWithPolicy(retryPolicy, context, header, body);
}
-void MultisetAttributes(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TNode::TMapType& value,
- const TMultisetAttributesOptions& options)
-{
- THttpHeader header("PUT", "api/v4/multiset_attributes", false);
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForMultisetAttributes(transactionId, context.Config->Prefix, path, options));
-
- auto body = NodeToYsonString(value);
- RetryRequestWithPolicy(retryPolicy, context, header, body);
-}
-
-bool Exists(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TExistsOptions& options)
-{
- THttpHeader header("GET", "exists");
- header.MergeParameters(SerializeParamsForExists(transactionId, context.Config->Prefix, path, options));
- return ParseBoolFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
-}
-
TNodeId Create(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index 8cb226ab862..96636808f37 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -39,35 +39,6 @@ void ExecuteBatch(
// Cypress
//
-TNode Get(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TGetOptions& options = TGetOptions());
-
-TNode TryGet(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TGetOptions& options);
-
-void MultisetAttributes(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TNode::TMapType& value,
- const TMultisetAttributesOptions& options = TMultisetAttributesOptions());
-
-bool Exists(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TExistsOptions& options = TExistsOptions());
-
TNodeId Create(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,