aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/raw_client
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2025-01-15 20:17:09 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2025-01-15 21:21:14 +0300
commitbb37d56338bace93813adc51a61936a19acc8edc (patch)
tree3d771660421001c994e2baf9a40b409d06c7a24b /yt/cpp/mapreduce/raw_client
parent9a04c0af35c6a47061b3699089da07acfcf14587 (diff)
downloadydb-bb37d56338bace93813adc51a61936a19acc8edc.tar.gz
YT-23616: Make THttpRawBatchRequest as an implementation of IRawBatchRequest interface
commit_hash:9e6c556686dda1562697762d38da532dc5c87b80
Diffstat (limited to 'yt/cpp/mapreduce/raw_client')
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.cpp52
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.h88
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp6
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h2
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp4
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h4
-rw-r--r--yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp2
7 files changed, 93 insertions, 65 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
index 1a340b8c17..08fee25b79 100644
--- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
@@ -285,16 +285,14 @@ THttpRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInsta
////////////////////////////////////////////////////////////////////////////////
-THttpRawBatchRequest::THttpRawBatchRequest(const TConfigPtr& config)
- : Config_(config)
+THttpRawBatchRequest::THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy)
+ : Context_(context)
+ , RequestRetryPolicy_(std::move(retryPolicy))
{ }
THttpRawBatchRequest::~THttpRawBatchRequest() = default;
-void THttpRawBatchRequest::ExecuteBatch(
- IRequestRetryPolicyPtr retryPolicy,
- const TClientContext& context,
- const TExecuteBatchOptions& options)
+void THttpRawBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options)
{
if (IsExecuted()) {
ythrow yexception() << "Cannot execute batch request since it is already executed";
@@ -306,8 +304,8 @@ void THttpRawBatchRequest::ExecuteBatch(
const auto concurrency = options.Concurrency_.GetOrElse(50);
const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
- if (!retryPolicy) {
- retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
+ if (!RequestRetryPolicy_) {
+ RequestRetryPolicy_ = CreateDefaultRequestRetryPolicy(Context_.Config);
}
while (BatchSize()) {
@@ -324,9 +322,9 @@ void THttpRawBatchRequest::ExecuteBatch(
TResponseInfo result;
try {
result = RequestWithRetry<TResponseInfo>(
- retryPolicy,
- [&context, &header, &body] (TMutationId& mutationId) {
- auto response = RequestWithoutRetry(context, mutationId, header, body);
+ RequestRetryPolicy_,
+ [this, &header, &body] (TMutationId& mutationId) {
+ auto response = RequestWithoutRetry(Context_, mutationId, header, body);
return TResponseInfo{
.RequestId = response->GetRequestId(),
.Response = response->GetResponse(),
@@ -337,7 +335,7 @@ void THttpRawBatchRequest::ExecuteBatch(
SetErrorResult(std::current_exception());
throw;
}
- ParseResponse(std::move(result), retryPolicy.Get());
+ ParseResponse(std::move(result), RequestRetryPolicy_.Get());
}
}
@@ -392,7 +390,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Create(
{
return AddRequest<TGuidResponseParser>(
"create",
- SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options),
+ SerializeParamsForCreate(transaction, Context_.Config->Prefix, path, type, options),
Nothing());
}
@@ -403,7 +401,7 @@ TFuture<void> THttpRawBatchRequest::Remove(
{
return AddRequest<TVoidResponseParser>(
"remove",
- SerializeParamsForRemove(transaction, Config_->Prefix, path, options),
+ SerializeParamsForRemove(transaction, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -414,7 +412,7 @@ TFuture<bool> THttpRawBatchRequest::Exists(
{
return AddRequest<TExistsResponseParser>(
"exists",
- SerializeParamsForExists(transaction, Config_->Prefix, path, options),
+ SerializeParamsForExists(transaction, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -425,7 +423,7 @@ TFuture<TNode> THttpRawBatchRequest::Get(
{
return AddRequest<TGetResponseParser>(
"get",
- SerializeParamsForGet(transaction, Config_->Prefix, path, options),
+ SerializeParamsForGet(transaction, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -437,7 +435,7 @@ TFuture<void> THttpRawBatchRequest::Set(
{
return AddRequest<TVoidResponseParser>(
"set",
- SerializeParamsForSet(transaction, Config_->Prefix, path, options),
+ SerializeParamsForSet(transaction, Context_.Config->Prefix, path, options),
node);
}
@@ -448,7 +446,7 @@ TFuture<TNode::TListType> THttpRawBatchRequest::List(
{
return AddRequest<TListResponseParser>(
"list",
- SerializeParamsForList(transaction, Config_->Prefix, path, options),
+ SerializeParamsForList(transaction, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -460,7 +458,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Copy(
{
return AddRequest<TGuidResponseParser>(
"copy",
- SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options),
+ SerializeParamsForCopy(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options),
Nothing());
}
@@ -472,7 +470,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Move(
{
return AddRequest<TGuidResponseParser>(
"move",
- SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options),
+ SerializeParamsForMove(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options),
Nothing());
}
@@ -484,7 +482,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Link(
{
return AddRequest<TGuidResponseParser>(
"link",
- SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options),
+ SerializeParamsForLink(transaction, Context_.Config->Prefix, targetPath, linkPath, options),
Nothing());
}
@@ -496,7 +494,7 @@ TFuture<TLockId> THttpRawBatchRequest::Lock(
{
return AddRequest<TGuidResponseParser>(
"lock",
- SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options),
+ SerializeParamsForLock(transaction, Context_.Config->Prefix, path, mode, options),
Nothing());
}
@@ -507,7 +505,7 @@ TFuture<void> THttpRawBatchRequest::Unlock(
{
return AddRequest<TVoidResponseParser>(
"unlock",
- SerializeParamsForUnlock(transaction, Config_->Prefix, path, options),
+ SerializeParamsForUnlock(transaction, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -532,7 +530,7 @@ TFuture<TYPath> THttpRawBatchRequest::PutFileToCache(
{
return AddRequest<TYPathParser>(
"put_file_to_cache",
- SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options),
+ SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options),
Nothing());
}
@@ -544,7 +542,7 @@ TFuture<TCheckPermissionResponse> THttpRawBatchRequest::CheckPermission(
{
return AddRequest<TCheckPermissionParser>(
"check_permission",
- SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options),
+ SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options),
Nothing());
}
@@ -607,7 +605,7 @@ TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path)
TRichYPath result = path;
// Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath.
if (!result.Path_.StartsWith("<")) {
- result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix);
+ result.Path_ = AddPathPrefix(result.Path_, Context_.Config->Prefix);
}
if (result.Path_.find_first_of("<>{}[]") != TString::npos) {
@@ -615,7 +613,7 @@ TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path)
"parse_ypath",
SerializeParamsForParseYPath(result),
Nothing(),
- MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, result));
+ MakeIntrusive<TCanonizeYPathResponseParser>(Context_.Config->Prefix, result));
}
return NThreading::MakeFuture(result);
}
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.h b/yt/cpp/mapreduce/raw_client/raw_batch_request.h
index 670b97a843..2af0e31305 100644
--- a/yt/cpp/mapreduce/raw_client/raw_batch_request.h
+++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.h
@@ -2,13 +2,14 @@
#include <yt/cpp/mapreduce/common/fwd.h>
-#include <yt/cpp/mapreduce/interface/batch_request.h>
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/http/requests.h>
+
#include <yt/cpp/mapreduce/interface/fwd.h>
#include <yt/cpp/mapreduce/interface/node.h>
+#include <yt/cpp/mapreduce/interface/raw_batch_request.h>
#include <yt/cpp/mapreduce/interface/retry_policy.h>
-#include <yt/cpp/mapreduce/http/requests.h>
-
#include <library/cpp/threading/future/future.h>
#include <util/generic/ptr.h>
@@ -25,7 +26,7 @@ namespace NYT::NDetail::NRawClient {
////////////////////////////////////////////////////////////////////////////////
class THttpRawBatchRequest
- : public TThrRefBase
+ : public IRawBatchRequest
{
public:
struct IResponseItemParser
@@ -38,13 +39,10 @@ public:
};
public:
- THttpRawBatchRequest(const TConfigPtr& config);
+ THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy);
~THttpRawBatchRequest();
- void ExecuteBatch(
- IRequestRetryPolicyPtr retryPolicy,
- const TClientContext& context,
- const TExecuteBatchOptions& options = {});
+ void ExecuteBatch(const TExecuteBatchOptions& options = {}) override;
bool IsExecuted() const;
void MarkExecuted();
@@ -68,91 +66,113 @@ public:
const TTransactionId& transaction,
const TYPath& path,
ENodeType type,
- const TCreateOptions& options);
+ const TCreateOptions& options = {}) override;
+
::NThreading::TFuture<void> Remove(
const TTransactionId& transaction,
const TYPath& path,
- const TRemoveOptions& options);
+ const TRemoveOptions& options = {}) override;
+
::NThreading::TFuture<bool> Exists(
const TTransactionId& transaction,
const TYPath& path,
- const TExistsOptions& options);
+ const TExistsOptions& options = {}) override;
+
::NThreading::TFuture<TNode> Get(
const TTransactionId& transaction,
const TYPath& path,
- const TGetOptions& options);
+ const TGetOptions& options = {}) override;
+
::NThreading::TFuture<void> Set(
const TTransactionId& transaction,
const TYPath& path,
const TNode& value,
- const TSetOptions& options);
+ const TSetOptions& options = {}) override;
+
::NThreading::TFuture<TNode::TListType> List(
const TTransactionId& transaction,
const TYPath& path,
- const TListOptions& options);
+ const TListOptions& options = {}) override;
+
::NThreading::TFuture<TNodeId> Copy(
const TTransactionId& transaction,
const TYPath& sourcePath,
const TYPath& destinationPath,
- const TCopyOptions& options);
+ const TCopyOptions& options = {}) override;
+
::NThreading::TFuture<TNodeId> Move(
const TTransactionId& transaction,
const TYPath& sourcePath,
const TYPath& destinationPath,
- const TMoveOptions& options);
+ const TMoveOptions& options = {}) override;
+
::NThreading::TFuture<TNodeId> Link(
const TTransactionId& transaction,
const TYPath& targetPath,
const TYPath& linkPath,
- const TLinkOptions& options);
+ const TLinkOptions& options = {}) override;
+
::NThreading::TFuture<TLockId> Lock(
const TTransactionId& transaction,
const TYPath& path,
ELockMode mode,
- const TLockOptions& options);
+ const TLockOptions& options = {}) override;
+
::NThreading::TFuture<void> Unlock(
const TTransactionId& transaction,
const TYPath& path,
- const TUnlockOptions& options);
+ const TUnlockOptions& options = {}) override;
+
::NThreading::TFuture<TMaybe<TYPath>> GetFileFromCache(
const TTransactionId& transactionId,
const TString& md5Signature,
const TYPath& cachePath,
- const TGetFileFromCacheOptions& options);
+ const TGetFileFromCacheOptions& options = {}) override;
+
::NThreading::TFuture<TYPath> PutFileToCache(
const TTransactionId& transactionId,
const TYPath& filePath,
const TString& md5Signature,
const TYPath& cachePath,
- const TPutFileToCacheOptions& options);
+ const TPutFileToCacheOptions& options = {}) override;
+
::NThreading::TFuture<TCheckPermissionResponse> CheckPermission(
const TString& user,
EPermission permission,
const TYPath& path,
- const TCheckPermissionOptions& options);
+ const TCheckPermissionOptions& options = {}) override;
+
::NThreading::TFuture<TOperationAttributes> GetOperation(
const TOperationId& operationId,
- const TGetOperationOptions& options);
- ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId);
- ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId);
+ const TGetOperationOptions& options = {}) override;
+
+ ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId) override;
+
+ ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId) override;
+
::NThreading::TFuture<void> SuspendOperation(
const TOperationId& operationId,
- const TSuspendOperationOptions& options);
+ const TSuspendOperationOptions& options = {}) override;
+
::NThreading::TFuture<void> ResumeOperation(
const TOperationId& operationId,
- const TResumeOperationOptions& options);
+ const TResumeOperationOptions& options = {}) override;
+
::NThreading::TFuture<void> UpdateOperationParameters(
const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options);
- ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path);
+ const TUpdateOperationParametersOptions& options = {}) override;
+
+ ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path) override;
+
::NThreading::TFuture<TVector<TTableColumnarStatistics>> GetTableColumnarStatistics(
const TTransactionId& transaction,
const TVector<TRichYPath>& paths,
- const TGetTableColumnarStatisticsOptions& options);
+ const TGetTableColumnarStatisticsOptions& options = {}) override;
+
::NThreading::TFuture<TMultiTablePartitions> GetTablePartitions(
const TTransactionId& transaction,
const TVector<TRichYPath>& paths,
- const TGetTablePartitionsOptions& options);
+ const TGetTablePartitionsOptions& options = {}) override;
private:
struct TBatchItem {
@@ -182,7 +202,9 @@ private:
void AddRequest(TBatchItem batchItem);
private:
- TConfigPtr Config_;
+ const TClientContext Context_;
+
+ IRequestRetryPolicyPtr RequestRetryPolicy_;
TDeque<TBatchItem> BatchItemList_;
bool Executed_ = false;
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 53d2be114a..f25fe5a4ee 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -11,6 +11,7 @@
#include <yt/cpp/mapreduce/http/retry_request.h>
#include <yt/cpp/mapreduce/interface/fluent.h>
+#include <yt/cpp/mapreduce/interface/fwd.h>
#include <yt/cpp/mapreduce/interface/operation.h>
#include <yt/cpp/mapreduce/interface/tvm.h>
@@ -925,6 +926,11 @@ ui64 THttpRawClient::GenerateTimestamp()
return NodeFromYsonString(responseInfo->GetResponse()).AsUint64();
}
+IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest()
+{
+ return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index e18e32a92e..f0e8378db3 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -335,6 +335,8 @@ public:
ui64 GenerateTimestamp() override;
+ IRawBatchRequestPtr CreateRawBatchRequest() override;
+
private:
const TClientContext Context_;
};
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index fd5c59ec20..6ba8a3f084 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -265,13 +265,13 @@ TVector<TRichYPath> CanonizeYPaths(
const TClientContext& context,
const TVector<TRichYPath>& paths)
{
- THttpRawBatchRequest batch(context.Config);
+ THttpRawBatchRequest batch(context, retryPolicy);
TVector<NThreading::TFuture<TRichYPath>> futures;
futures.reserve(paths.size());
for (int i = 0; i < static_cast<int>(paths.size()); ++i) {
futures.push_back(batch.CanonizeYPath(paths[i]));
}
- batch.ExecuteBatch(retryPolicy, context);
+ batch.ExecuteBatch();
TVector<TRichYPath> result;
result.reserve(futures.size());
for (auto& future : futures) {
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index 860d4ae939..9b37293b12 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -59,13 +59,13 @@ auto BatchTransform(
TBatchAdder batchAdder,
const TExecuteBatchOptions& executeBatchOptions = {})
{
- THttpRawBatchRequest batch(context.Config);
+ THttpRawBatchRequest batch(context, retryPolicy);
using TFuture = decltype(batchAdder(batch, *std::begin(src)));
TVector<TFuture> futures;
for (const auto& el : src) {
futures.push_back(batchAdder(batch, el));
}
- batch.ExecuteBatch(retryPolicy, context, executeBatchOptions);
+ batch.ExecuteBatch(executeBatchOptions);
using TDst = decltype(futures[0].ExtractValueSync());
TVector<TDst> result;
result.reserve(std::size(src));
diff --git a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
index 36a6935e82..1871ee80fb 100644
--- a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
+++ b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
@@ -73,7 +73,7 @@ TVector<TString> GetAllPathsFromRequestList(const TNode& requestList)
TEST(TBatchRequestImplTest, ParseResponse) {
TClientContext context;
- THttpRawBatchRequest batchRequest(context.Config);
+ THttpRawBatchRequest batchRequest(context, /*retryPolicy*/ nullptr);
EXPECT_EQ(batchRequest.BatchSize(), 0u);