summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/client.cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2024-12-17 18:57:33 +0300
committerhiddenpath <[email protected]>2024-12-17 19:37:06 +0300
commit82bd12196bba9102174e22e23d9fcce5897bdf76 (patch)
tree1750f28771b73c26d91007ea3b1518f7ffb453f6 /yt/cpp/mapreduce/client/client.cpp
parent174f410da8f66e7525c017656f485eb45a978a09 (diff)
[yt/cpp/mapreduce] YT-23616: Move some http methods to THttpRawClient
commit_hash:4e2845ba995aaf7bbae2c24735ceb099a116c89d
Diffstat (limited to 'yt/cpp/mapreduce/client/client.cpp')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp184
1 files changed, 55 insertions, 129 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 3177ea9d6a5..b7fbb7f0807 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -1051,14 +1051,11 @@ void TClient::MountTable(
const TMountTableOptions& options)
{
CheckShutdown();
-
- THttpHeader header("POST", "mount_table");
- SetTabletParams(header, path, options);
- if (options.CellId_) {
- header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
- }
- header.AddParameter("freeze", options.Freeze_);
- RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &options] (TMutationId& mutationId) {
+ RawClient_->MountTable(mutationId, path, options);
+ });
}
void TClient::UnmountTable(
@@ -1066,11 +1063,11 @@ void TClient::UnmountTable(
const TUnmountTableOptions& options)
{
CheckShutdown();
-
- THttpHeader header("POST", "unmount_table");
- SetTabletParams(header, path, options);
- header.AddParameter("force", options.Force_);
- RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &options] (TMutationId& mutationId) {
+ RawClient_->UnmountTable(mutationId, path, options);
+ });
}
void TClient::RemountTable(
@@ -1078,10 +1075,11 @@ void TClient::RemountTable(
const TRemountTableOptions& options)
{
CheckShutdown();
-
- THttpHeader header("POST", "remount_table");
- SetTabletParams(header, path, options);
- RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &options] (TMutationId& mutationId) {
+ RawClient_->RemountTable(mutationId, path, options);
+ });
}
void TClient::FreezeTable(
@@ -1106,11 +1104,11 @@ void TClient::ReshardTable(
const TReshardTableOptions& options)
{
CheckShutdown();
-
- THttpHeader header("POST", "reshard_table");
- SetTabletParams(header, path, options);
- header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
- RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &keys, &options] (TMutationId& mutationId) {
+ RawClient_->ReshardTableByPivotKeys(mutationId, path, keys, options);
+ });
}
void TClient::ReshardTable(
@@ -1119,11 +1117,11 @@ void TClient::ReshardTable(
const TReshardTableOptions& options)
{
CheckShutdown();
-
- THttpHeader header("POST", "reshard_table");
- SetTabletParams(header, path, options);
- header.AddParameter("tablet_count", tabletCount);
- RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, tabletCount, &options] (TMutationId& mutationId) {
+ RawClient_->ReshardTableByTabletCount(mutationId, path, tabletCount, options);
+ });
}
void TClient::InsertRows(
@@ -1132,16 +1130,11 @@ void TClient::InsertRows(
const TInsertRowsOptions& options)
{
CheckShutdown();
-
- THttpHeader header("PUT", "insert_rows");
- header.SetInputFormat(TFormat::YsonBinary());
- // TODO: use corresponding raw request
- header.MergeParameters(NRawClient::SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
-
- auto body = NodeListToYsonString(rows);
- TRequestConfig config;
- config.IsHeavy = true;
- RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &rows, &options] (TMutationId /*mutationId*/) {
+ RawClient_->InsertRows(path, rows, options);
+ });
}
void TClient::DeleteRows(
@@ -1160,16 +1153,11 @@ void TClient::TrimRows(
const TTrimRowsOptions& options)
{
CheckShutdown();
-
- THttpHeader header("POST", "trim_rows");
- header.AddParameter("trimmed_row_count", rowCount);
- header.AddParameter("tablet_index", tabletIndex);
- // TODO: use corresponding raw request
- header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
-
- TRequestConfig config;
- config.IsHeavy = true;
- RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, tabletIndex, rowCount, &options] (TMutationId /*mutationId*/) {
+ RawClient_->TrimRows(path, tabletIndex, rowCount, options);
+ });
}
TNode::TListType TClient::LookupRows(
@@ -1178,31 +1166,11 @@ TNode::TListType TClient::LookupRows(
const TLookupRowsOptions& options)
{
CheckShutdown();
-
- Y_UNUSED(options);
- THttpHeader header("PUT", "lookup_rows");
- header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion));
- header.SetInputFormat(TFormat::YsonBinary());
- header.SetOutputFormat(TFormat::YsonBinary());
-
- header.MergeParameters(BuildYsonNodeFluently().BeginMap()
- .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
- })
- .Item("keep_missing_rows").Value(options.KeepMissingRows_)
- .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("versioned").Value(*options.Versioned_);
- })
- .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("column_names").Value(*options.Columns_);
- })
- .EndMap());
-
- auto body = NodeListToYsonString(keys);
- TRequestConfig config;
- config.IsHeavy = true;
- auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config);
- return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList();
+ return RequestWithRetry<TNode::TListType>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &keys, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->LookupRows(path, keys, options);
+ });
}
TNode::TListType TClient::SelectRows(
@@ -1210,32 +1178,11 @@ TNode::TListType TClient::SelectRows(
const TSelectRowsOptions& options)
{
CheckShutdown();
-
- THttpHeader header("GET", "select_rows");
- header.SetInputFormat(TFormat::YsonBinary());
- header.SetOutputFormat(TFormat::YsonBinary());
-
- header.MergeParameters(BuildYsonNodeFluently().BeginMap()
- .Item("query").Value(query)
- .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
- })
- .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("input_row_limit").Value(*options.InputRowLimit_);
- })
- .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("output_row_limit").Value(*options.OutputRowLimit_);
- })
- .Item("range_expansion_limit").Value(options.RangeExpansionLimit_)
- .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_)
- .Item("verbose_logging").Value(options.VerboseLogging_)
- .Item("enable_code_cache").Value(options.EnableCodeCache_)
- .EndMap());
-
- TRequestConfig config;
- config.IsHeavy = true;
- auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
- return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList();
+ return RequestWithRetry<TNode::TListType>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &query, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->SelectRows(query, options);
+ });
}
void TClient::AlterTableReplica(const TReplicaId& replicaId, const TAlterTableReplicaOptions& options)
@@ -1247,27 +1194,21 @@ void TClient::AlterTableReplica(const TReplicaId& replicaId, const TAlterTableRe
ui64 TClient::GenerateTimestamp()
{
CheckShutdown();
- THttpHeader header("GET", "generate_timestamp");
- TRequestConfig config;
- config.IsHeavy = true;
- auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
- return NodeFromYsonString(requestResult.Response).AsUint64();
+ return RequestWithRetry<ui64>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this] (TMutationId /*mutationId*/) {
+ return RawClient_->GenerateTimestamp();
+ });
}
TAuthorizationInfo TClient::WhoAmI()
{
CheckShutdown();
-
- THttpHeader header("GET", "auth/whoami", /* isApi = */ false);
- auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
- TAuthorizationInfo result;
-
- NJson::TJsonValue jsonValue;
- bool ok = NJson::ReadJsonTree(requestResult.Response, &jsonValue, /* throwOnError = */ true);
- Y_ABORT_UNLESS(ok);
- result.Login = jsonValue["login"].GetString();
- result.Realm = jsonValue["realm"].GetString();
- return result;
+ return RequestWithRetry<TAuthorizationInfo>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this] (TMutationId /*mutationId*/) {
+ return RawClient_->WhoAmI();
+ });
}
TOperationAttributes TClient::GetOperation(
@@ -1472,21 +1413,6 @@ TClientPtr TClient::GetParentClientImpl()
return this;
}
-template <class TOptions>
-void TClient::SetTabletParams(
- THttpHeader& header,
- const TYPath& path,
- const TOptions& options)
-{
- header.AddPath(AddPathPrefix(path, Context_.Config->Prefix));
- if (options.FirstTabletIndex_) {
- header.AddParameter("first_tablet_index", *options.FirstTabletIndex_);
- }
- if (options.LastTabletIndex_) {
- header.AddParameter("last_tablet_index", *options.LastTabletIndex_);
- }
-}
-
void TClient::CheckShutdown() const
{
if (Shutdown_) {