diff options
| author | hiddenpath <[email protected]> | 2024-12-17 18:57:33 +0300 |
|---|---|---|
| committer | hiddenpath <[email protected]> | 2024-12-17 19:37:06 +0300 |
| commit | 82bd12196bba9102174e22e23d9fcce5897bdf76 (patch) | |
| tree | 1750f28771b73c26d91007ea3b1518f7ffb453f6 /yt/cpp/mapreduce/client/client.cpp | |
| parent | 174f410da8f66e7525c017656f485eb45a978a09 (diff) | |
[yt/cpp/mapreduce] YT-23616: Move some http methods to THttpRawClient
commit_hash:4e2845ba995aaf7bbae2c24735ceb099a116c89d
Diffstat (limited to 'yt/cpp/mapreduce/client/client.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 184 |
1 files changed, 55 insertions, 129 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 3177ea9d6a5..b7fbb7f0807 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -1051,14 +1051,11 @@ void TClient::MountTable( const TMountTableOptions& options) { CheckShutdown(); - - THttpHeader header("POST", "mount_table"); - SetTabletParams(header, path, options); - if (options.CellId_) { - header.AddParameter("cell_id", GetGuidAsString(*options.CellId_)); - } - header.AddParameter("freeze", options.Freeze_); - RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &options] (TMutationId& mutationId) { + RawClient_->MountTable(mutationId, path, options); + }); } void TClient::UnmountTable( @@ -1066,11 +1063,11 @@ void TClient::UnmountTable( const TUnmountTableOptions& options) { CheckShutdown(); - - THttpHeader header("POST", "unmount_table"); - SetTabletParams(header, path, options); - header.AddParameter("force", options.Force_); - RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &options] (TMutationId& mutationId) { + RawClient_->UnmountTable(mutationId, path, options); + }); } void TClient::RemountTable( @@ -1078,10 +1075,11 @@ void TClient::RemountTable( const TRemountTableOptions& options) { CheckShutdown(); - - THttpHeader header("POST", "remount_table"); - SetTabletParams(header, path, options); - RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &options] (TMutationId& mutationId) { + RawClient_->RemountTable(mutationId, path, options); + }); } void TClient::FreezeTable( @@ -1106,11 +1104,11 @@ void TClient::ReshardTable( const TReshardTableOptions& options) { CheckShutdown(); - - THttpHeader header("POST", "reshard_table"); - SetTabletParams(header, path, options); - header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys)); - RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &keys, &options] (TMutationId& mutationId) { + RawClient_->ReshardTableByPivotKeys(mutationId, path, keys, options); + }); } void TClient::ReshardTable( @@ -1119,11 +1117,11 @@ void TClient::ReshardTable( const TReshardTableOptions& options) { CheckShutdown(); - - THttpHeader header("POST", "reshard_table"); - SetTabletParams(header, path, options); - header.AddParameter("tablet_count", tabletCount); - RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, tabletCount, &options] (TMutationId& mutationId) { + RawClient_->ReshardTableByTabletCount(mutationId, path, tabletCount, options); + }); } void TClient::InsertRows( @@ -1132,16 +1130,11 @@ void TClient::InsertRows( const TInsertRowsOptions& options) { CheckShutdown(); - - THttpHeader header("PUT", "insert_rows"); - header.SetInputFormat(TFormat::YsonBinary()); - // TODO: use corresponding raw request - header.MergeParameters(NRawClient::SerializeParametersForInsertRows(Context_.Config->Prefix, path, options)); - - auto body = NodeListToYsonString(rows); - TRequestConfig config; - config.IsHeavy = true; - RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &rows, &options] (TMutationId /*mutationId*/) { + RawClient_->InsertRows(path, rows, options); + }); } void TClient::DeleteRows( @@ -1160,16 +1153,11 @@ void TClient::TrimRows( const TTrimRowsOptions& options) { CheckShutdown(); - - THttpHeader header("POST", "trim_rows"); - header.AddParameter("trimmed_row_count", rowCount); - header.AddParameter("tablet_index", tabletIndex); - // TODO: use corresponding raw request - header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options)); - - TRequestConfig config; - config.IsHeavy = true; - RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, tabletIndex, rowCount, &options] (TMutationId /*mutationId*/) { + RawClient_->TrimRows(path, tabletIndex, rowCount, options); + }); } TNode::TListType TClient::LookupRows( @@ -1178,31 +1166,11 @@ TNode::TListType TClient::LookupRows( const TLookupRowsOptions& options) { CheckShutdown(); - - Y_UNUSED(options); - THttpHeader header("PUT", "lookup_rows"); - header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion)); - header.SetInputFormat(TFormat::YsonBinary()); - header.SetOutputFormat(TFormat::YsonBinary()); - - header.MergeParameters(BuildYsonNodeFluently().BeginMap() - .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) { - fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds())); - }) - .Item("keep_missing_rows").Value(options.KeepMissingRows_) - .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) { - fluent.Item("versioned").Value(*options.Versioned_); - }) - .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) { - fluent.Item("column_names").Value(*options.Columns_); - }) - .EndMap()); - - auto body = NodeListToYsonString(keys); - TRequestConfig config; - config.IsHeavy = true; - auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config); - return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList(); + return RequestWithRetry<TNode::TListType>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &keys, &options] (TMutationId /*mutationId*/) { + return RawClient_->LookupRows(path, keys, options); + }); } TNode::TListType TClient::SelectRows( @@ -1210,32 +1178,11 @@ TNode::TListType TClient::SelectRows( const TSelectRowsOptions& options) { CheckShutdown(); - - THttpHeader header("GET", "select_rows"); - header.SetInputFormat(TFormat::YsonBinary()); - header.SetOutputFormat(TFormat::YsonBinary()); - - header.MergeParameters(BuildYsonNodeFluently().BeginMap() - .Item("query").Value(query) - .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) { - fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds())); - }) - .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) { - fluent.Item("input_row_limit").Value(*options.InputRowLimit_); - }) - .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) { - fluent.Item("output_row_limit").Value(*options.OutputRowLimit_); - }) - .Item("range_expansion_limit").Value(options.RangeExpansionLimit_) - .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_) - .Item("verbose_logging").Value(options.VerboseLogging_) - .Item("enable_code_cache").Value(options.EnableCodeCache_) - .EndMap()); - - TRequestConfig config; - config.IsHeavy = true; - auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config); - return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList(); + return RequestWithRetry<TNode::TListType>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &query, &options] (TMutationId /*mutationId*/) { + return RawClient_->SelectRows(query, options); + }); } void TClient::AlterTableReplica(const TReplicaId& replicaId, const TAlterTableReplicaOptions& options) @@ -1247,27 +1194,21 @@ void TClient::AlterTableReplica(const TReplicaId& replicaId, const TAlterTableRe ui64 TClient::GenerateTimestamp() { CheckShutdown(); - THttpHeader header("GET", "generate_timestamp"); - TRequestConfig config; - config.IsHeavy = true; - auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config); - return NodeFromYsonString(requestResult.Response).AsUint64(); + return RequestWithRetry<ui64>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this] (TMutationId /*mutationId*/) { + return RawClient_->GenerateTimestamp(); + }); } TAuthorizationInfo TClient::WhoAmI() { CheckShutdown(); - - THttpHeader header("GET", "auth/whoami", /* isApi = */ false); - auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header); - TAuthorizationInfo result; - - NJson::TJsonValue jsonValue; - bool ok = NJson::ReadJsonTree(requestResult.Response, &jsonValue, /* throwOnError = */ true); - Y_ABORT_UNLESS(ok); - result.Login = jsonValue["login"].GetString(); - result.Realm = jsonValue["realm"].GetString(); - return result; + return RequestWithRetry<TAuthorizationInfo>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this] (TMutationId /*mutationId*/) { + return RawClient_->WhoAmI(); + }); } TOperationAttributes TClient::GetOperation( @@ -1472,21 +1413,6 @@ TClientPtr TClient::GetParentClientImpl() return this; } -template <class TOptions> -void TClient::SetTabletParams( - THttpHeader& header, - const TYPath& path, - const TOptions& options) -{ - header.AddPath(AddPathPrefix(path, Context_.Config->Prefix)); - if (options.FirstTabletIndex_) { - header.AddParameter("first_tablet_index", *options.FirstTabletIndex_); - } - if (options.LastTabletIndex_) { - header.AddParameter("last_tablet_index", *options.LastTabletIndex_); - } -} - void TClient::CheckShutdown() const { if (Shutdown_) { |
