aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2025-01-15 20:17:09 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2025-01-15 21:21:14 +0300
commitbb37d56338bace93813adc51a61936a19acc8edc (patch)
tree3d771660421001c994e2baf9a40b409d06c7a24b /yt/cpp
parent9a04c0af35c6a47061b3699089da07acfcf14587 (diff)
downloadydb-bb37d56338bace93813adc51a61936a19acc8edc.tar.gz
YT-23616: Make THttpRawBatchRequest as an implementation of IRawBatchRequest interface
commit_hash:9e6c556686dda1562697762d38da532dc5c87b80
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/client/batch_request_impl.cpp6
-rw-r--r--yt/cpp/mapreduce/client/batch_request_impl.h8
-rw-r--r--yt/cpp/mapreduce/client/lock.cpp2
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp9
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp14
-rw-r--r--yt/cpp/mapreduce/client/prepare_operation.cpp10
-rw-r--r--yt/cpp/mapreduce/client/prepare_operation.h3
-rw-r--r--yt/cpp/mapreduce/client/skiff.cpp2
-rw-r--r--yt/cpp/mapreduce/client/yt_poller.cpp4
-rw-r--r--yt/cpp/mapreduce/client/yt_poller.h6
-rw-r--r--yt/cpp/mapreduce/interface/fwd.h7
-rw-r--r--yt/cpp/mapreduce/interface/raw_batch_request.h134
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h4
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.cpp52
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.h88
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp6
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h2
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp4
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h4
-rw-r--r--yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp2
20 files changed, 260 insertions, 107 deletions
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.cpp b/yt/cpp/mapreduce/client/batch_request_impl.cpp
index f45eee319b..e7456b9187 100644
--- a/yt/cpp/mapreduce/client/batch_request_impl.cpp
+++ b/yt/cpp/mapreduce/client/batch_request_impl.cpp
@@ -36,11 +36,11 @@ using ::NThreading::NewPromise;
TBatchRequest::TBatchRequest(const TTransactionId& defaultTransaction, ::TIntrusivePtr<TClient> client)
: DefaultTransaction_(defaultTransaction)
- , Impl_(MakeIntrusive<THttpRawBatchRequest>(client->GetContext().Config))
+ , Impl_(client->GetRawClient()->CreateRawBatchRequest())
, Client_(client)
{ }
-TBatchRequest::TBatchRequest(THttpRawBatchRequest* impl, ::TIntrusivePtr<TClient> client)
+TBatchRequest::TBatchRequest(IRawBatchRequest* impl, ::TIntrusivePtr<TClient> client)
: Impl_(impl)
, Client_(std::move(client))
{ }
@@ -189,7 +189,7 @@ TFuture<TCheckPermissionResponse> TBatchRequest::CheckPermission(
void TBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options)
{
- Impl_->ExecuteBatch(Client_->GetRetryPolicy()->CreatePolicyForGenericRequest(), Client_->GetContext(), options);
+ Impl_->ExecuteBatch(options);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.h b/yt/cpp/mapreduce/client/batch_request_impl.h
index d53eb196dd..bbd993a925 100644
--- a/yt/cpp/mapreduce/client/batch_request_impl.h
+++ b/yt/cpp/mapreduce/client/batch_request_impl.h
@@ -22,10 +22,6 @@ struct TResponseInfo;
class TClient;
using TClientPtr = ::TIntrusivePtr<TClient>;
-namespace NRawClient {
- class THttpRawBatchRequest;
-}
-
////////////////////////////////////////////////////////////////////////////////
class TBatchRequest
@@ -119,11 +115,11 @@ public:
virtual void ExecuteBatch(const TExecuteBatchOptions& executeBatch) override;
private:
- TBatchRequest(NDetail::NRawClient::THttpRawBatchRequest* impl, ::TIntrusivePtr<TClient> client);
+ TBatchRequest(IRawBatchRequest* impl, ::TIntrusivePtr<TClient> client);
private:
TTransactionId DefaultTransaction_;
- ::TIntrusivePtr<NDetail::NRawClient::THttpRawBatchRequest> Impl_;
+ IRawBatchRequestPtr Impl_;
std::unique_ptr<TBatchRequest> TmpWithTransaction_;
::TIntrusivePtr<TClient> Client_;
diff --git a/yt/cpp/mapreduce/client/lock.cpp b/yt/cpp/mapreduce/client/lock.cpp
index 46306fa737..4eecfdb0d5 100644
--- a/yt/cpp/mapreduce/client/lock.cpp
+++ b/yt/cpp/mapreduce/client/lock.cpp
@@ -26,7 +26,7 @@ public:
, Acquired_(acquired)
{ }
- void PrepareRequest(THttpRawBatchRequest* batchRequest) override
+ void PrepareRequest(IRawBatchRequest* batchRequest) override
{
LockState_ = batchRequest->Get(TTransactionId(), LockStateYPath_, TGetOptions());
}
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index 8ace8a44bb..5caf302358 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -228,7 +228,7 @@ TStructuredJobTableList ApplyProtobufColumnFilters(
CreateDefaultRequestRetryPolicy(preparer.GetContext().Config),
preparer.GetContext(),
tableList,
- [&] (NRawClient::THttpRawBatchRequest& batch, const auto& table) {
+ [&] (IRawBatchRequest& batch, const auto& table) {
return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
});
@@ -318,7 +318,6 @@ TSimpleOperationIo CreateSimpleOperationIo(
inputs,
outputs,
preparer.GetClient()->GetRawClient(),
- preparer.GetContext(),
preparer.GetClientRetryPolicy(),
preparer.GetTransactionId()),
&inputs,
@@ -497,7 +496,6 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
structuredInputs,
structuredOutputs,
preparer.GetClient()->GetRawClient(),
- preparer.GetContext(),
preparer.GetClientRetryPolicy(),
preparer.GetTransactionId()),
&structuredInputs,
@@ -1688,7 +1686,6 @@ void ExecuteMapReduce(
structuredInputs,
mapperOutput,
preparer->GetClient()->GetRawClient(),
- preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
&structuredInputs,
@@ -1758,7 +1755,6 @@ void ExecuteMapReduce(
inputs,
outputs,
preparer->GetClient()->GetRawClient(),
- preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
&inputs,
@@ -1824,7 +1820,6 @@ void ExecuteMapReduce(
structuredInputs,
structuredOutputs,
preparer->GetClient()->GetRawClient(),
- preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
&structuredInputs,
@@ -2343,7 +2338,7 @@ public:
: OperationImpl_(std::move(operationImpl))
{ }
- void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) override
+ void PrepareRequest(IRawBatchRequest* batchRequest) override
{
auto filter = TOperationAttributeFilter()
.Add(EOperationAttribute::State)
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index ac6ea0f923..1e5f81b5d9 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -42,7 +42,7 @@ public:
, Transaction_(std::move(transaction))
{ }
- void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) override
+ void PrepareRequest(IRawBatchRequest* batchRequest) override
{
Future_ = batchRequest->GetOperation(
OperationId_,
@@ -213,26 +213,26 @@ void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths)
TVector<::NThreading::TFuture<TLockId>> lockIdFutures;
lockIdFutures.reserve(paths->size());
- NRawClient::THttpRawBatchRequest lockRequest(GetContext().Config);
+ auto lockRequest = Client_->GetRawClient()->CreateRawBatchRequest();
for (const auto& path : *paths) {
- lockIdFutures.push_back(lockRequest.Lock(
+ lockIdFutures.push_back(lockRequest->Lock(
FileTransaction_->GetId(),
path.Path_,
ELockMode::LM_SNAPSHOT,
TLockOptions().Waitable(true)));
}
- lockRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext());
+ lockRequest->ExecuteBatch();
TVector<::NThreading::TFuture<TNode>> nodeIdFutures;
nodeIdFutures.reserve(paths->size());
- NRawClient::THttpRawBatchRequest getNodeIdRequest(GetContext().Config);
+ auto getNodeIdRequest = Client_->GetRawClient()->CreateRawBatchRequest();
for (const auto& lockIdFuture : lockIdFutures) {
- nodeIdFutures.push_back(getNodeIdRequest.Get(
+ nodeIdFutures.push_back(getNodeIdRequest->Get(
FileTransaction_->GetId(),
::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id",
TGetOptions()));
}
- getNodeIdRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext());
+ getNodeIdRequest->ExecuteBatch();
for (size_t i = 0; i != paths->size(); ++i) {
auto& richPath = (*paths)[i];
diff --git a/yt/cpp/mapreduce/client/prepare_operation.cpp b/yt/cpp/mapreduce/client/prepare_operation.cpp
index 83695ef1e8..31574d7e0e 100644
--- a/yt/cpp/mapreduce/client/prepare_operation.cpp
+++ b/yt/cpp/mapreduce/client/prepare_operation.cpp
@@ -20,11 +20,9 @@ TOperationPreparationContext::TOperationPreparationContext(
const TStructuredJobTableList& structuredInputs,
const TStructuredJobTableList& structuredOutputs,
const IRawClientPtr& rawClient,
- const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId)
: RawClient_(rawClient)
- , Context_(context)
, RetryPolicy_(retryPolicy)
, TransactionId_(transactionId)
, InputSchemas_(structuredInputs.size())
@@ -44,11 +42,9 @@ TOperationPreparationContext::TOperationPreparationContext(
TVector<TRichYPath> inputs,
TVector<TRichYPath> outputs,
const IRawClientPtr& rawClient,
- const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId)
: RawClient_(rawClient)
- , Context_(context)
, RetryPolicy_(retryPolicy)
, TransactionId_(transactionId)
, InputSchemas_(inputs.size())
@@ -77,17 +73,17 @@ int TOperationPreparationContext::GetOutputCount() const
const TVector<TTableSchema>& TOperationPreparationContext::GetInputSchemas() const
{
TVector<::NThreading::TFuture<TNode>> schemaFutures;
- NRawClient::THttpRawBatchRequest batch(Context_.Config);
+ auto batch = RawClient_->CreateRawBatchRequest();
for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
if (InputSchemasLoaded_[tableIndex]) {
schemaFutures.emplace_back();
continue;
}
Y_ABORT_UNLESS(Inputs_[tableIndex]);
- schemaFutures.push_back(batch.Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{}));
+ schemaFutures.push_back(batch->Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{}));
}
- batch.ExecuteBatch(RetryPolicy_->CreatePolicyForGenericRequest(), Context_);
+ batch->ExecuteBatch();
for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
if (schemaFutures[tableIndex].Initialized()) {
diff --git a/yt/cpp/mapreduce/client/prepare_operation.h b/yt/cpp/mapreduce/client/prepare_operation.h
index 3fde1d1678..7b34703e89 100644
--- a/yt/cpp/mapreduce/client/prepare_operation.h
+++ b/yt/cpp/mapreduce/client/prepare_operation.h
@@ -16,7 +16,6 @@ public:
const TStructuredJobTableList& structuredInputs,
const TStructuredJobTableList& structuredOutputs,
const IRawClientPtr& rawClient,
- const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId);
@@ -24,7 +23,6 @@ public:
TVector<TRichYPath> inputs,
TVector<TRichYPath> outputs,
const IRawClientPtr& rawClient,
- const TClientContext& context,
const IClientRetryPolicyPtr& retryPolicy,
TTransactionId transactionId);
@@ -42,7 +40,6 @@ private:
TVector<TMaybe<TRichYPath>> Outputs_;
const IRawClientPtr RawClient_;
- const TClientContext& Context_;
const IClientRetryPolicyPtr RetryPolicy_;
TTransactionId TransactionId_;
diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp
index 499a1308fd..50ef7793ba 100644
--- a/yt/cpp/mapreduce/client/skiff.cpp
+++ b/yt/cpp/mapreduce/client/skiff.cpp
@@ -307,7 +307,7 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
clientRetryPolicy->CreatePolicyForGenericRequest(),
context,
NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
- [&] (THttpRawBatchRequest& batch, const TRichYPath& path) {
+ [&] (IRawBatchRequest& batch, const TRichYPath& path) {
auto getOptions = TGetOptions()
.AttributeFilter(
TAttributeFilter()
diff --git a/yt/cpp/mapreduce/client/yt_poller.cpp b/yt/cpp/mapreduce/client/yt_poller.cpp
index 13670fddb5..4980d287fa 100644
--- a/yt/cpp/mapreduce/client/yt_poller.cpp
+++ b/yt/cpp/mapreduce/client/yt_poller.cpp
@@ -92,14 +92,14 @@ void TYtPoller::WatchLoop()
Y_ABORT_UNLESS(!InProgress_.empty());
}
- THttpRawBatchRequest rawBatchRequest(Context_.Config);
+ THttpRawBatchRequest rawBatchRequest(Context_, ClientRetryPolicy_->CreatePolicyForGenericRequest());
for (auto& item : InProgress_) {
item->PrepareRequest(&rawBatchRequest);
}
try {
- rawBatchRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_);
+ rawBatchRequest.ExecuteBatch();
} catch (const std::exception& ex) {
YT_LOG_ERROR("Exception while executing batch request: %v", ex.what());
}
diff --git a/yt/cpp/mapreduce/client/yt_poller.h b/yt/cpp/mapreduce/client/yt_poller.h
index fe9979a6a9..c2e6809324 100644
--- a/yt/cpp/mapreduce/client/yt_poller.h
+++ b/yt/cpp/mapreduce/client/yt_poller.h
@@ -15,10 +15,6 @@
namespace NYT {
namespace NDetail {
-namespace NRawClient {
- class THttpRawBatchRequest;
-}
-
////////////////////////////////////////////////////////////////////////////////
class IYtPollerItem
@@ -33,7 +29,7 @@ public:
public:
virtual ~IYtPollerItem() = default;
- virtual void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) = 0;
+ virtual void PrepareRequest(IRawBatchRequest* batchRequest) = 0;
// Should return PollContinue if poller should continue polling this item.
// Should return PollBreak if poller should stop polling this item.
diff --git a/yt/cpp/mapreduce/interface/fwd.h b/yt/cpp/mapreduce/interface/fwd.h
index 162a6ee3e9..9dcdf3789a 100644
--- a/yt/cpp/mapreduce/interface/fwd.h
+++ b/yt/cpp/mapreduce/interface/fwd.h
@@ -399,6 +399,13 @@ namespace NYT {
using IRetryConfigProviderPtr = ::TIntrusivePtr<IRetryConfigProvider>;
////////////////////////////////////////////////////////////////////////////////
+ // raw_batch_request.h
+ ////////////////////////////////////////////////////////////////////////////////
+
+ class IRawBatchRequest;
+ using IRawBatchRequestPtr = ::TIntrusivePtr<IRawBatchRequest>;
+
+ ////////////////////////////////////////////////////////////////////////////////
// raw_client.h
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/interface/raw_batch_request.h b/yt/cpp/mapreduce/interface/raw_batch_request.h
new file mode 100644
index 0000000000..ec7b2b542c
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/raw_batch_request.h
@@ -0,0 +1,134 @@
+#pragma once
+
+#include "client_method_options.h"
+#include "fwd.h"
+#include "operation.h"
+
+#include <library/cpp/threading/future/core/future.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class IRawBatchRequest
+ : public virtual TThrRefBase
+{
+public:
+ virtual void ExecuteBatch(const TExecuteBatchOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TNodeId> Create(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ ENodeType type,
+ const TCreateOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<void> Remove(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TRemoveOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<bool> Exists(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TExistsOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TNode> Get(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TGetOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<void> Set(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TNode& value,
+ const TSetOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TNode::TListType> List(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TListOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TNodeId> Copy(
+ const TTransactionId& transaction,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TCopyOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TNodeId> Move(
+ const TTransactionId& transaction,
+ const TYPath& sourcePath,
+ const TYPath& destinationPath,
+ const TMoveOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TNodeId> Link(
+ const TTransactionId& transaction,
+ const TYPath& targetPath,
+ const TYPath& linkPath,
+ const TLinkOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TLockId> Lock(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ ELockMode mode,
+ const TLockOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<void> Unlock(
+ const TTransactionId& transaction,
+ const TYPath& path,
+ const TUnlockOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TMaybe<TYPath>> GetFileFromCache(
+ const TTransactionId& transactionId,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TGetFileFromCacheOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TYPath> PutFileToCache(
+ const TTransactionId& transactionId,
+ const TYPath& filePath,
+ const TString& md5Signature,
+ const TYPath& cachePath,
+ const TPutFileToCacheOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TCheckPermissionResponse> CheckPermission(
+ const TString& user,
+ EPermission permission,
+ const TYPath& path,
+ const TCheckPermissionOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TOperationAttributes> GetOperation(
+ const TOperationId& operationId,
+ const TGetOperationOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId) = 0;
+
+ virtual ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId) = 0;
+
+ virtual ::NThreading::TFuture<void> SuspendOperation(
+ const TOperationId& operationId,
+ const TSuspendOperationOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<void> ResumeOperation(
+ const TOperationId& operationId,
+ const TResumeOperationOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<void> UpdateOperationParameters(
+ const TOperationId& operationId,
+ const TUpdateOperationParametersOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path) = 0;
+
+ virtual ::NThreading::TFuture<TVector<TTableColumnarStatistics>> GetTableColumnarStatistics(
+ const TTransactionId& transaction,
+ const TVector<TRichYPath>& paths,
+ const TGetTableColumnarStatisticsOptions& options = {}) = 0;
+
+ virtual ::NThreading::TFuture<TMultiTablePartitions> GetTablePartitions(
+ const TTransactionId& transaction,
+ const TVector<TRichYPath>& paths,
+ const TGetTablePartitionsOptions& options = {}) = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index 1c140d1dd9..441e738287 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -327,6 +327,10 @@ public:
const TGetTablePartitionsOptions& options = {}) = 0;
virtual ui64 GenerateTimestamp() = 0;
+
+ // Batch
+
+ virtual IRawBatchRequestPtr CreateRawBatchRequest() = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
index 1a340b8c17..08fee25b79 100644
--- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
@@ -285,16 +285,14 @@ THttpRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInsta
////////////////////////////////////////////////////////////////////////////////
-THttpRawBatchRequest::THttpRawBatchRequest(const TConfigPtr& config)
- : Config_(config)
+THttpRawBatchRequest::THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy)
+ : Context_(context)
+ , RequestRetryPolicy_(std::move(retryPolicy))
{ }
THttpRawBatchRequest::~THttpRawBatchRequest() = default;
-void THttpRawBatchRequest::ExecuteBatch(
- IRequestRetryPolicyPtr retryPolicy,
- const TClientContext& context,
- const TExecuteBatchOptions& options)
+void THttpRawBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options)
{
if (IsExecuted()) {
ythrow yexception() << "Cannot execute batch request since it is already executed";
@@ -306,8 +304,8 @@ void THttpRawBatchRequest::ExecuteBatch(
const auto concurrency = options.Concurrency_.GetOrElse(50);
const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
- if (!retryPolicy) {
- retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
+ if (!RequestRetryPolicy_) {
+ RequestRetryPolicy_ = CreateDefaultRequestRetryPolicy(Context_.Config);
}
while (BatchSize()) {
@@ -324,9 +322,9 @@ void THttpRawBatchRequest::ExecuteBatch(
TResponseInfo result;
try {
result = RequestWithRetry<TResponseInfo>(
- retryPolicy,
- [&context, &header, &body] (TMutationId& mutationId) {
- auto response = RequestWithoutRetry(context, mutationId, header, body);
+ RequestRetryPolicy_,
+ [this, &header, &body] (TMutationId& mutationId) {
+ auto response = RequestWithoutRetry(Context_, mutationId, header, body);
return TResponseInfo{
.RequestId = response->GetRequestId(),
.Response = response->GetResponse(),
@@ -337,7 +335,7 @@ void THttpRawBatchRequest::ExecuteBatch(
SetErrorResult(std::current_exception());
throw;
}
- ParseResponse(std::move(result), retryPolicy.Get());
+ ParseResponse(std::move(result), RequestRetryPolicy_.Get());
}
}
@@ -392,7 +390,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Create(
{
return AddRequest<TGuidResponseParser>(
"create",
- SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options),
+ SerializeParamsForCreate(transaction, Context_.Config->Prefix, path, type, options),
Nothing());
}
@@ -403,7 +401,7 @@ TFuture<void> THttpRawBatchRequest::Remove(
{
return AddRequest<TVoidResponseParser>(
"remove",
- SerializeParamsForRemove(transaction, Config_->Prefix, path, options),
+ SerializeParamsForRemove(transaction, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -414,7 +412,7 @@ TFuture<bool> THttpRawBatchRequest::Exists(
{
return AddRequest<TExistsResponseParser>(
"exists",
- SerializeParamsForExists(transaction, Config_->Prefix, path, options),
+ SerializeParamsForExists(transaction, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -425,7 +423,7 @@ TFuture<TNode> THttpRawBatchRequest::Get(
{
return AddRequest<TGetResponseParser>(
"get",
- SerializeParamsForGet(transaction, Config_->Prefix, path, options),
+ SerializeParamsForGet(transaction, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -437,7 +435,7 @@ TFuture<void> THttpRawBatchRequest::Set(
{
return AddRequest<TVoidResponseParser>(
"set",
- SerializeParamsForSet(transaction, Config_->Prefix, path, options),
+ SerializeParamsForSet(transaction, Context_.Config->Prefix, path, options),
node);
}
@@ -448,7 +446,7 @@ TFuture<TNode::TListType> THttpRawBatchRequest::List(
{
return AddRequest<TListResponseParser>(
"list",
- SerializeParamsForList(transaction, Config_->Prefix, path, options),
+ SerializeParamsForList(transaction, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -460,7 +458,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Copy(
{
return AddRequest<TGuidResponseParser>(
"copy",
- SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options),
+ SerializeParamsForCopy(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options),
Nothing());
}
@@ -472,7 +470,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Move(
{
return AddRequest<TGuidResponseParser>(
"move",
- SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options),
+ SerializeParamsForMove(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options),
Nothing());
}
@@ -484,7 +482,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Link(
{
return AddRequest<TGuidResponseParser>(
"link",
- SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options),
+ SerializeParamsForLink(transaction, Context_.Config->Prefix, targetPath, linkPath, options),
Nothing());
}
@@ -496,7 +494,7 @@ TFuture<TLockId> THttpRawBatchRequest::Lock(
{
return AddRequest<TGuidResponseParser>(
"lock",
- SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options),
+ SerializeParamsForLock(transaction, Context_.Config->Prefix, path, mode, options),
Nothing());
}
@@ -507,7 +505,7 @@ TFuture<void> THttpRawBatchRequest::Unlock(
{
return AddRequest<TVoidResponseParser>(
"unlock",
- SerializeParamsForUnlock(transaction, Config_->Prefix, path, options),
+ SerializeParamsForUnlock(transaction, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -532,7 +530,7 @@ TFuture<TYPath> THttpRawBatchRequest::PutFileToCache(
{
return AddRequest<TYPathParser>(
"put_file_to_cache",
- SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options),
+ SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options),
Nothing());
}
@@ -544,7 +542,7 @@ TFuture<TCheckPermissionResponse> THttpRawBatchRequest::CheckPermission(
{
return AddRequest<TCheckPermissionParser>(
"check_permission",
- SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options),
+ SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -607,7 +605,7 @@ TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path)
TRichYPath result = path;
// Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath.
if (!result.Path_.StartsWith("<")) {
- result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix);
+ result.Path_ = AddPathPrefix(result.Path_, Context_.Config->Prefix);
}
if (result.Path_.find_first_of("<>{}[]") != TString::npos) {
@@ -615,7 +613,7 @@ TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path)
"parse_ypath",
SerializeParamsForParseYPath(result),
Nothing(),
- MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, result));
+ MakeIntrusive<TCanonizeYPathResponseParser>(Context_.Config->Prefix, result));
}
return NThreading::MakeFuture(result);
}
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.h b/yt/cpp/mapreduce/raw_client/raw_batch_request.h
index 670b97a843..2af0e31305 100644
--- a/yt/cpp/mapreduce/raw_client/raw_batch_request.h
+++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.h
@@ -2,13 +2,14 @@
#include <yt/cpp/mapreduce/common/fwd.h>
-#include <yt/cpp/mapreduce/interface/batch_request.h>
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/http/requests.h>
+
#include <yt/cpp/mapreduce/interface/fwd.h>
#include <yt/cpp/mapreduce/interface/node.h>
+#include <yt/cpp/mapreduce/interface/raw_batch_request.h>
#include <yt/cpp/mapreduce/interface/retry_policy.h>
-#include <yt/cpp/mapreduce/http/requests.h>
-
#include <library/cpp/threading/future/future.h>
#include <util/generic/ptr.h>
@@ -25,7 +26,7 @@ namespace NYT::NDetail::NRawClient {
////////////////////////////////////////////////////////////////////////////////
class THttpRawBatchRequest
- : public TThrRefBase
+ : public IRawBatchRequest
{
public:
struct IResponseItemParser
@@ -38,13 +39,10 @@ public:
};
public:
- THttpRawBatchRequest(const TConfigPtr& config);
+ THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy);
~THttpRawBatchRequest();
- void ExecuteBatch(
- IRequestRetryPolicyPtr retryPolicy,
- const TClientContext& context,
- const TExecuteBatchOptions& options = {});
+ void ExecuteBatch(const TExecuteBatchOptions& options = {}) override;
bool IsExecuted() const;
void MarkExecuted();
@@ -68,91 +66,113 @@ public:
const TTransactionId& transaction,
const TYPath& path,
ENodeType type,
- const TCreateOptions& options);
+ const TCreateOptions& options = {}) override;
+
::NThreading::TFuture<void> Remove(
const TTransactionId& transaction,
const TYPath& path,
- const TRemoveOptions& options);
+ const TRemoveOptions& options = {}) override;
+
::NThreading::TFuture<bool> Exists(
const TTransactionId& transaction,
const TYPath& path,
- const TExistsOptions& options);
+ const TExistsOptions& options = {}) override;
+
::NThreading::TFuture<TNode> Get(
const TTransactionId& transaction,
const TYPath& path,
- const TGetOptions& options);
+ const TGetOptions& options = {}) override;
+
::NThreading::TFuture<void> Set(
const TTransactionId& transaction,
const TYPath& path,
const TNode& value,
- const TSetOptions& options);
+ const TSetOptions& options = {}) override;
+
::NThreading::TFuture<TNode::TListType> List(
const TTransactionId& transaction,
const TYPath& path,
- const TListOptions& options);
+ const TListOptions& options = {}) override;
+
::NThreading::TFuture<TNodeId> Copy(
const TTransactionId& transaction,
const TYPath& sourcePath,
const TYPath& destinationPath,
- const TCopyOptions& options);
+ const TCopyOptions& options = {}) override;
+
::NThreading::TFuture<TNodeId> Move(
const TTransactionId& transaction,
const TYPath& sourcePath,
const TYPath& destinationPath,
- const TMoveOptions& options);
+ const TMoveOptions& options = {}) override;
+
::NThreading::TFuture<TNodeId> Link(
const TTransactionId& transaction,
const TYPath& targetPath,
const TYPath& linkPath,
- const TLinkOptions& options);
+ const TLinkOptions& options = {}) override;
+
::NThreading::TFuture<TLockId> Lock(
const TTransactionId& transaction,
const TYPath& path,
ELockMode mode,
- const TLockOptions& options);
+ const TLockOptions& options = {}) override;
+
::NThreading::TFuture<void> Unlock(
const TTransactionId& transaction,
const TYPath& path,
- const TUnlockOptions& options);
+ const TUnlockOptions& options = {}) override;
+
::NThreading::TFuture<TMaybe<TYPath>> GetFileFromCache(
const TTransactionId& transactionId,
const TString& md5Signature,
const TYPath& cachePath,
- const TGetFileFromCacheOptions& options);
+ const TGetFileFromCacheOptions& options = {}) override;
+
::NThreading::TFuture<TYPath> PutFileToCache(
const TTransactionId& transactionId,
const TYPath& filePath,
const TString& md5Signature,
const TYPath& cachePath,
- const TPutFileToCacheOptions& options);
+ const TPutFileToCacheOptions& options = {}) override;
+
::NThreading::TFuture<TCheckPermissionResponse> CheckPermission(
const TString& user,
EPermission permission,
const TYPath& path,
- const TCheckPermissionOptions& options);
+ const TCheckPermissionOptions& options = {}) override;
+
::NThreading::TFuture<TOperationAttributes> GetOperation(
const TOperationId& operationId,
- const TGetOperationOptions& options);
- ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId);
- ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId);
+ const TGetOperationOptions& options = {}) override;
+
+ ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId) override;
+
+ ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId) override;
+
::NThreading::TFuture<void> SuspendOperation(
const TOperationId& operationId,
- const TSuspendOperationOptions& options);
+ const TSuspendOperationOptions& options = {}) override;
+
::NThreading::TFuture<void> ResumeOperation(
const TOperationId& operationId,
- const TResumeOperationOptions& options);
+ const TResumeOperationOptions& options = {}) override;
+
::NThreading::TFuture<void> UpdateOperationParameters(
const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options);
- ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path);
+ const TUpdateOperationParametersOptions& options = {}) override;
+
+ ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path) override;
+
::NThreading::TFuture<TVector<TTableColumnarStatistics>> GetTableColumnarStatistics(
const TTransactionId& transaction,
const TVector<TRichYPath>& paths,
- const TGetTableColumnarStatisticsOptions& options);
+ const TGetTableColumnarStatisticsOptions& options = {}) override;
+
::NThreading::TFuture<TMultiTablePartitions> GetTablePartitions(
const TTransactionId& transaction,
const TVector<TRichYPath>& paths,
- const TGetTablePartitionsOptions& options);
+ const TGetTablePartitionsOptions& options = {}) override;
private:
struct TBatchItem {
@@ -182,7 +202,9 @@ private:
void AddRequest(TBatchItem batchItem);
private:
- TConfigPtr Config_;
+ const TClientContext Context_;
+
+ IRequestRetryPolicyPtr RequestRetryPolicy_;
TDeque<TBatchItem> BatchItemList_;
bool Executed_ = false;
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 53d2be114a..f25fe5a4ee 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -11,6 +11,7 @@
#include <yt/cpp/mapreduce/http/retry_request.h>
#include <yt/cpp/mapreduce/interface/fluent.h>
+#include <yt/cpp/mapreduce/interface/fwd.h>
#include <yt/cpp/mapreduce/interface/operation.h>
#include <yt/cpp/mapreduce/interface/tvm.h>
@@ -925,6 +926,11 @@ ui64 THttpRawClient::GenerateTimestamp()
return NodeFromYsonString(responseInfo->GetResponse()).AsUint64();
}
+IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest()
+{
+ return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index e18e32a92e..f0e8378db3 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -335,6 +335,8 @@ public:
ui64 GenerateTimestamp() override;
+ IRawBatchRequestPtr CreateRawBatchRequest() override;
+
private:
const TClientContext Context_;
};
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index fd5c59ec20..6ba8a3f084 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -265,13 +265,13 @@ TVector<TRichYPath> CanonizeYPaths(
const TClientContext& context,
const TVector<TRichYPath>& paths)
{
- THttpRawBatchRequest batch(context.Config);
+ THttpRawBatchRequest batch(context, retryPolicy);
TVector<NThreading::TFuture<TRichYPath>> futures;
futures.reserve(paths.size());
for (int i = 0; i < static_cast<int>(paths.size()); ++i) {
futures.push_back(batch.CanonizeYPath(paths[i]));
}
- batch.ExecuteBatch(retryPolicy, context);
+ batch.ExecuteBatch();
TVector<TRichYPath> result;
result.reserve(futures.size());
for (auto& future : futures) {
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index 860d4ae939..9b37293b12 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -59,13 +59,13 @@ auto BatchTransform(
TBatchAdder batchAdder,
const TExecuteBatchOptions& executeBatchOptions = {})
{
- THttpRawBatchRequest batch(context.Config);
+ THttpRawBatchRequest batch(context, retryPolicy);
using TFuture = decltype(batchAdder(batch, *std::begin(src)));
TVector<TFuture> futures;
for (const auto& el : src) {
futures.push_back(batchAdder(batch, el));
}
- batch.ExecuteBatch(retryPolicy, context, executeBatchOptions);
+ batch.ExecuteBatch(executeBatchOptions);
using TDst = decltype(futures[0].ExtractValueSync());
TVector<TDst> result;
result.reserve(std::size(src));
diff --git a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
index 36a6935e82..1871ee80fb 100644
--- a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
+++ b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
@@ -73,7 +73,7 @@ TVector<TString> GetAllPathsFromRequestList(const TNode& requestList)
TEST(TBatchRequestImplTest, ParseResponse) {
TClientContext context;
- THttpRawBatchRequest batchRequest(context.Config);
+ THttpRawBatchRequest batchRequest(context, /*retryPolicy*/ nullptr);
EXPECT_EQ(batchRequest.BatchSize(), 0u);