aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2024-12-13 20:18:29 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2024-12-13 20:58:57 +0300
commit91a8ffd57d8783a3d940c9506254fa7e2012e1ec (patch)
treee482755ebffea84fefd270a9a9d2a32b604d18ad
parent8afd0e6dc9d41134a0cccfd6b6c5fe843efd80fb (diff)
downloadydb-91a8ffd57d8783a3d940c9506254fa7e2012e1ec.tar.gz
yt/cpp/mapreduce: move Create to THttpRawClient
commit_hash:9ca8428c322034064576bb56f74e704425ce7de9
-rw-r--r--yt/cpp/mapreduce/client/client.cpp20
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp54
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp41
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h7
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp13
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h7
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp29
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h8
8 files changed, 98 insertions, 81 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 24aed7fb77..45a5bf1d47 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -112,7 +112,11 @@ TNodeId TClientBase::Create(
ENodeType type,
const TCreateOptions& options)
{
- return NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, type, options);
+ return RequestWithRetry<TNodeId>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &type, &options] (TMutationId& mutationId) {
+ return RawClient_->Create(mutationId, TransactionId_, path, type, options);
+ });
}
void TClientBase::Remove(
@@ -337,8 +341,11 @@ IFileWriterPtr TClientBase::CreateFileWriter(
return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_);
});
if (!exists) {
- NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE,
- TCreateOptions().IgnoreExisting(true));
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &realPath] (TMutationId& mutationId) {
+ RawClient_->Create(mutationId, TransactionId_, realPath.Path_, NT_FILE, TCreateOptions().IgnoreExisting(true));
+ });
}
return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
@@ -737,8 +744,11 @@ THolder<TClientWriter> TClientBase::CreateClientWriter(
return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_);
});
if (!exists) {
- NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE,
- TCreateOptions().IgnoreExisting(true));
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &realPath] (TMutationId& mutataionId) {
+ RawClient_->Create(mutataionId, TransactionId_, realPath.Path_, NT_TABLE, TCreateOptions().IgnoreExisting(true));
+ });
}
return MakeHolder<TClientWriter>(
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index 5bb67e3059..0feaa016ed 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -973,26 +973,32 @@ template <typename TSpec>
void CreateDebugOutputTables(const TSpec& spec, const TOperationPreparer& preparer)
{
if (spec.StderrTablePath_.Defined()) {
- NYT::NDetail::Create(
+ RequestWithRetry<void>(
preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- preparer.GetContext(),
- TTransactionId(),
- *spec.StderrTablePath_,
- NT_TABLE,
- TCreateOptions()
- .IgnoreExisting(true)
- .Recursive(true));
+ [&spec, &preparer] (TMutationId& mutationId) {
+ preparer.GetClient()->GetRawClient()->Create(
+ mutationId,
+ TTransactionId(),
+ *spec.StderrTablePath_,
+ NT_TABLE,
+ TCreateOptions()
+ .IgnoreExisting(true)
+ .Recursive(true));
+ });
}
if (spec.CoreTablePath_.Defined()) {
- NYT::NDetail::Create(
+ RequestWithRetry<void>(
preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- preparer.GetContext(),
- TTransactionId(),
- *spec.CoreTablePath_,
- NT_TABLE,
- TCreateOptions()
- .IgnoreExisting(true)
- .Recursive(true));
+ [&spec, &preparer] (TMutationId& mutationId) {
+ preparer.GetClient()->GetRawClient()->Create(
+ mutationId,
+ TTransactionId(),
+ *spec.CoreTablePath_,
+ NT_TABLE,
+ TCreateOptions()
+ .IgnoreExisting(true)
+ .Recursive(true));
+ });
}
}
@@ -1003,12 +1009,18 @@ void CreateOutputTable(
Y_ENSURE(path.Path_, "Output table is not set");
if (!path.Create_.Defined()) {
// If `create` attribute is defined
- Create(
+ RequestWithRetry<void>(
preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- preparer.GetContext(), preparer.GetTransactionId(), path.Path_, NT_TABLE,
- TCreateOptions()
- .IgnoreExisting(true)
- .Recursive(true));
+ [&preparer, &path] (TMutationId& mutationId) {
+ preparer.GetClient()->GetRawClient()->Create(
+ mutationId,
+ preparer.GetTransactionId(),
+ path.Path_,
+ NT_TABLE,
+ TCreateOptions()
+ .IgnoreExisting(true)
+ .Recursive(true));
+ });
}
}
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index eb7264425c..b5263420f0 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -490,15 +490,18 @@ TYPath TJobPreparer::GetCachePath() const
void TJobPreparer::CreateStorage() const
{
- Create(
+ RequestWithRetry<void>(
OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- OperationPreparer_.GetContext(),
- Options_.FileStorageTransactionId_,
- GetCachePath(),
- NT_MAP,
- TCreateOptions()
- .IgnoreExisting(true)
- .Recursive(true));
+ [this] (TMutationId& mutationId) {
+ RawClient_->Create(
+ mutationId,
+ Options_.FileStorageTransactionId_,
+ GetCachePath(),
+ NT_MAP,
+ TCreateOptions()
+ .IgnoreExisting(true)
+ .Recursive(true));
+ });
}
int TJobPreparer::GetFileCacheReplicationFactor() const
@@ -517,17 +520,19 @@ void TJobPreparer::CreateFileInCypress(const TString& path) const
attributes["expiration_timeout"] = Options_.FileExpirationTimeout_->MilliSeconds();
}
- Create(
+ RequestWithRetry<void>(
OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- OperationPreparer_.GetContext(),
- Options_.FileStorageTransactionId_,
- path,
- NT_FILE,
- TCreateOptions()
- .IgnoreExisting(true)
- .Recursive(true)
- .Attributes(attributes)
- );
+ [this, &path, &attributes] (TMutationId& mutationId) {
+ RawClient_->Create(
+ mutationId,
+ Options_.FileStorageTransactionId_,
+ path,
+ NT_FILE,
+ TCreateOptions()
+ .IgnoreExisting(true)
+ .Recursive(true)
+ .Attributes(attributes));
+ });
}
TString TJobPreparer::PutFileToCypressCache(
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index 85b7d1aa54..e5c672929c 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -43,6 +43,13 @@ public:
const TYPath& path,
const TNode::TMapType& value,
const TMultisetAttributesOptions& options = {}) = 0;
+
+ virtual TNodeId Create(
+ TMutationId& mutatatonId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const ENodeType& type,
+ const TCreateOptions& options = {}) = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index cc09682377..034dff7f33 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -81,6 +81,19 @@ void THttpRawClient::MultisetAttributes(
RequestWithoutRetry(Context_, mutationId, header, body);
}
+TNodeId THttpRawClient::Create(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const ENodeType& type,
+ const TCreateOptions& options)
+{
+ THttpHeader header("POST", "create");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForCreate(transactionId, Context_.Config->Prefix, path, type, options));
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index 3b31aa519c..0908794168 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -49,6 +49,13 @@ public:
const TNode::TMapType& value,
const TMultisetAttributesOptions& options = {}) override;
+ TNodeId Create(
+ TMutationId& mutatatonId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const ENodeType& type,
+ const TCreateOptions& options = {}) override;
+
private:
const TClientContext Context_;
};
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index 13100a2907..9d643afc91 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -79,35 +79,6 @@ void ExecuteBatch(
}
}
-void Set(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TNode& value,
- const TSetOptions& options)
-{
- THttpHeader header("PUT", "set");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForSet(transactionId, context.Config->Prefix, path, options));
- auto body = NodeToYsonString(value);
- RetryRequestWithPolicy(retryPolicy, context, header, body);
-}
-
-TNodeId Create(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const ENodeType& type,
- const TCreateOptions& options)
-{
- THttpHeader header("POST", "create");
- header.AddMutationId();
- header.MergeParameters(SerializeParamsForCreate(transactionId, context.Config->Prefix, path, type, options));
- return ParseGuidFromResponse(RetryRequestWithPolicy(retryPolicy, context, header).Response);
-}
-
TNodeId CopyWithoutRetries(
const TClientContext& context,
const TTransactionId& transactionId,
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index 96636808f3..9a02e34951 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -39,14 +39,6 @@ void ExecuteBatch(
// Cypress
//
-TNodeId Create(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const ENodeType& type,
- const TCreateOptions& options = TCreateOptions());
-
TNodeId CopyWithoutRetries(
const TClientContext& context,
const TTransactionId& transactionId,