summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'yt/cpp/mapreduce/client/client.cpp')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp60
1 files changed, 47 insertions, 13 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index a2d302212f1..24aed7fb77f 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -126,14 +126,22 @@ bool TClientBase::Exists(
const TYPath& path,
const TExistsOptions& options)
{
- return NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
+ return RequestWithRetry<bool>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &options] (TMutationId& mutationId) {
+ return RawClient_->Exists(mutationId, TransactionId_, path, options);
+ });
}
TNode TClientBase::Get(
const TYPath& path,
const TGetOptions& options)
{
- return NRawClient::Get(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
+ return RequestWithRetry<TNode>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &options] (TMutationId& mutationId) {
+ return RawClient_->Get(mutationId, TransactionId_, path, options);
+ });
}
void TClientBase::Set(
@@ -149,12 +157,17 @@ void TClientBase::Set(
}
void TClientBase::MultisetAttributes(
- const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options)
+ const TYPath& path,
+ const TNode::TMapType& value,
+ const TMultisetAttributesOptions& options)
{
- NRawClient::MultisetAttributes(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &value, &options] (TMutationId& mutationId) {
+ RawClient_->MultisetAttributes(mutationId, TransactionId_, path, value, options);
+ });
}
-
TNode::TListType TClientBase::List(
const TYPath& path,
const TListOptions& options)
@@ -290,6 +303,7 @@ IFileReaderPtr TClientBase::CreateBlobTableReader(
return new TBlobTableReader(
path,
key,
+ RawClient_,
ClientRetryPolicy_,
GetTransactionPinger(),
Context_,
@@ -303,6 +317,7 @@ IFileReaderPtr TClientBase::CreateFileReader(
{
return new TFileReader(
CanonizeYPath(path),
+ RawClient_,
ClientRetryPolicy_,
GetTransactionPinger(),
Context_,
@@ -315,11 +330,18 @@ IFileWriterPtr TClientBase::CreateFileWriter(
const TFileWriterOptions& options)
{
auto realPath = CanonizeYPath(path);
- if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
+
+ auto exists = RequestWithRetry<bool>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &realPath] (TMutationId& mutationId) {
+ return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_);
+ });
+ if (!exists) {
NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE,
TCreateOptions().IgnoreExisting(true));
}
- return new TFileWriter(realPath, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
+
+ return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
}
TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
@@ -343,6 +365,7 @@ TRawTableWriterPtr TClientBase::CreateRawWriter(
const TTableWriterOptions& options)
{
return ::MakeIntrusive<TRetryfulWriter>(
+ RawClient_,
ClientRetryPolicy_,
GetTransactionPinger(),
Context_,
@@ -673,7 +696,7 @@ void TClientBase::CompleteOperation(const TOperationId& operationId)
void TClientBase::WaitForOperation(const TOperationId& operationId)
{
- NYT::NDetail::WaitForOperation(ClientRetryPolicy_, Context_, operationId);
+ NYT::NDetail::WaitForOperation(ClientRetryPolicy_, RawClient_, Context_, operationId);
}
void TClientBase::AlterTable(
@@ -691,6 +714,7 @@ void TClientBase::AlterTable(
{
return ::MakeIntrusive<TClientReader>(
CanonizeYPath(path),
+ RawClient_,
ClientRetryPolicy_,
GetTransactionPinger(),
Context_,
@@ -706,12 +730,20 @@ THolder<TClientWriter> TClientBase::CreateClientWriter(
const TTableWriterOptions& options)
{
auto realPath = CanonizeYPath(path);
- if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
+
+ auto exists = RequestWithRetry<bool>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &realPath] (TMutationId& mutationId) {
+ return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_);
+ });
+ if (!exists) {
NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE,
TCreateOptions().IgnoreExisting(true));
}
+
return MakeHolder<TClientWriter>(
realPath,
+ RawClient_,
ClientRetryPolicy_,
GetTransactionPinger(),
Context_,
@@ -851,15 +883,16 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
////////////////////////////////////////////////////////////////////////////////
TTransaction::TTransaction(
- IRawClientPtr rawClient,
+ const IRawClientPtr& rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& parentTransactionId,
const TStartTransactionOptions& options)
- : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy())
+ : TClientBase(rawClient, context, parentTransactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
MakeHolder<TPingableTransaction>(
+ rawClient,
parentClient->GetRetryPolicy(),
context,
parentTransactionId,
@@ -871,15 +904,16 @@ TTransaction::TTransaction(
}
TTransaction::TTransaction(
- IRawClientPtr rawClient,
+ const IRawClientPtr& rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& transactionId,
const TAttachTransactionOptions& options)
- : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy())
+ : TClientBase(rawClient, context, transactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
new TPingableTransaction(
+ rawClient,
parentClient->GetRetryPolicy(),
context,
transactionId,