diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-17 18:57:33 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-17 19:37:06 +0300 |
commit | 82bd12196bba9102174e22e23d9fcce5897bdf76 (patch) | |
tree | 1750f28771b73c26d91007ea3b1518f7ffb453f6 /yt/cpp/mapreduce/raw_client | |
parent | 174f410da8f66e7525c017656f485eb45a978a09 (diff) | |
download | ydb-82bd12196bba9102174e22e23d9fcce5897bdf76.tar.gz |
[yt/cpp/mapreduce] YT-23616: Move some http methods to THttpRawClient
commit_hash:4e2845ba995aaf7bbae2c24735ceb099a116c89d
Diffstat (limited to 'yt/cpp/mapreduce/raw_client')
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 184 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 56 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h | 18 |
4 files changed, 261 insertions, 6 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index d95a513545..20b03a5ad9 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -10,6 +10,7 @@ #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/retry_request.h> +#include <yt/cpp/mapreduce/interface/fluent.h> #include <yt/cpp/mapreduce/interface/operation.h> #include <yt/cpp/mapreduce/interface/tvm.h> @@ -554,6 +555,189 @@ std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace( return result; } +void THttpRawClient::MountTable( + TMutationId& mutationId, + const TYPath& path, + const TMountTableOptions& options) +{ + THttpHeader header("POST", "mount_table"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); + if (options.CellId_) { + header.AddParameter("cell_id", GetGuidAsString(*options.CellId_)); + } + header.AddParameter("freeze", options.Freeze_); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::UnmountTable( + TMutationId& mutationId, + const TYPath& path, + const TUnmountTableOptions& options) +{ + THttpHeader header("POST", "unmount_table"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); + header.AddParameter("force", options.Force_); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::RemountTable( + TMutationId& mutationId, + const TYPath& path, + const TRemountTableOptions& options) +{ + THttpHeader header("POST", "remount_table"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::ReshardTableByPivotKeys( + TMutationId& mutationId, + const TYPath& path, + const TVector<TKey>& keys, + const TReshardTableOptions& options) +{ + THttpHeader header("POST", "reshard_table"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); + header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys)); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::ReshardTableByTabletCount( + TMutationId& mutationId, + const TYPath& path, + i64 tabletCount, + const TReshardTableOptions& options) +{ + THttpHeader header("POST", "reshard_table"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); + header.AddParameter("tablet_count", tabletCount); + RequestWithoutRetry(Context_, mutationId, header); +} + +void THttpRawClient::InsertRows( + const TYPath& path, + const TNode::TListType& rows, + const TInsertRowsOptions& options) +{ + TMutationId mutationId; + THttpHeader header("PUT", "insert_rows"); + header.SetInputFormat(TFormat::YsonBinary()); + header.MergeParameters(NRawClient::SerializeParametersForInsertRows(Context_.Config->Prefix, path, options)); + auto body = NodeListToYsonString(rows); + TRequestConfig config; + config.IsHeavy = true; + RequestWithoutRetry(Context_, mutationId, header, body, config); +} + +void THttpRawClient::TrimRows( + const TYPath& path, + i64 tabletIndex, + i64 rowCount, + const TTrimRowsOptions& options) +{ + TMutationId mutationId; + THttpHeader header("POST", "trim_rows"); + header.AddParameter("trimmed_row_count", rowCount); + header.AddParameter("tablet_index", tabletIndex); + header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options)); + TRequestConfig config; + config.IsHeavy = true; + RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); +} + +TNode::TListType THttpRawClient::LookupRows( + const TYPath& path, + const TNode::TListType& keys, + const TLookupRowsOptions& options) +{ + TMutationId mutationId; + THttpHeader header("PUT", "lookup_rows"); + header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion)); + header.SetInputFormat(TFormat::YsonBinary()); + header.SetOutputFormat(TFormat::YsonBinary()); + + header.MergeParameters(BuildYsonNodeFluently().BeginMap() + .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) { + fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds())); + }) + .Item("keep_missing_rows").Value(options.KeepMissingRows_) + .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) { + fluent.Item("versioned").Value(*options.Versioned_); + }) + .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) { + fluent.Item("column_names").Value(*options.Columns_); + }) + .EndMap()); + + auto body = NodeListToYsonString(keys); + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, body, config); + return NodeFromYsonString(responseInfo.Response, ::NYson::EYsonType::ListFragment).AsList(); +} + +TNode::TListType THttpRawClient::SelectRows( + const TString& query, + const TSelectRowsOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "select_rows"); + header.SetInputFormat(TFormat::YsonBinary()); + header.SetOutputFormat(TFormat::YsonBinary()); + + header.MergeParameters(BuildYsonNodeFluently().BeginMap() + .Item("query").Value(query) + .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) { + fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds())); + }) + .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) { + fluent.Item("input_row_limit").Value(*options.InputRowLimit_); + }) + .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) { + fluent.Item("output_row_limit").Value(*options.OutputRowLimit_); + }) + .Item("range_expansion_limit").Value(options.RangeExpansionLimit_) + .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_) + .Item("verbose_logging").Value(options.VerboseLogging_) + .Item("enable_code_cache").Value(options.EnableCodeCache_) + .EndMap()); + + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); + return NodeFromYsonString(responseInfo.Response, ::NYson::EYsonType::ListFragment).AsList(); +} + +ui64 THttpRawClient::GenerateTimestamp() +{ + TMutationId mutationId; + THttpHeader header("GET", "generate_timestamp"); + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); + return NodeFromYsonString(responseInfo.Response).AsUint64(); +} + +TAuthorizationInfo THttpRawClient::WhoAmI() +{ + TMutationId mutationId; + THttpHeader header("GET", "auth/whoami", /*isApi*/ false); + auto requestResult = RequestWithoutRetry(Context_, mutationId, header); + TAuthorizationInfo result; + + NJson::TJsonValue jsonValue; + bool ok = NJson::ReadJsonTree(requestResult.Response, &jsonValue, /*throwOnError*/ true); + Y_ABORT_UNLESS(ok); + result.Login = jsonValue["login"].GetString(); + result.Realm = jsonValue["realm"].GetString(); + return result; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index 77f10c1c65..55c5cee4c7 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -2,6 +2,7 @@ #include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/interface/client.h> #include <yt/cpp/mapreduce/interface/client_method_options.h> #include <yt/cpp/mapreduce/interface/raw_client.h> @@ -188,6 +189,61 @@ public: const TOperationId& operationId, const TGetJobTraceOptions& options = {}) override; + // Tables + + void MountTable( + TMutationId& mutationId, + const TYPath& path, + const TMountTableOptions& options = {}) override; + + void UnmountTable( + TMutationId& mutationId, + const TYPath& path, + const TUnmountTableOptions& options = {}) override; + + void RemountTable( + TMutationId& mutationId, + const TYPath& path, + const TRemountTableOptions& options = {}) override; + + void ReshardTableByPivotKeys( + TMutationId& mutationId, + const TYPath& path, + const TVector<TKey>& keys, + const TReshardTableOptions& options = {}) override; + + void ReshardTableByTabletCount( + TMutationId& mutationId, + const TYPath& path, + i64 tabletCount, + const TReshardTableOptions& options = {}) override; + + void InsertRows( + const TYPath& path, + const TNode::TListType& rows, + const TInsertRowsOptions& options = {}) override; + + void TrimRows( + const TYPath& path, + i64 tabletIndex, + i64 rowCount, + const TTrimRowsOptions& options = {}) override; + + TNode::TListType LookupRows( + const TYPath& path, + const TNode::TListType& keys, + const TLookupRowsOptions& options = {}) override; + + TNode::TListType SelectRows( + const TString& query, + const TSelectRowsOptions& options = {}) override; + + // Misc + + ui64 GenerateTimestamp() override; + + TAuthorizationInfo WhoAmI() override; + private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index 8f9e79de77..8474bd0edc 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -304,12 +304,11 @@ TNode SerializeParamsForUnlock( const TTransactionId& transactionId, const TString& pathPrefix, const TYPath& path, - const TUnlockOptions& options) + const TUnlockOptions& /*options*/) { TNode result; SetTransactionIdParam(&result, transactionId); SetPathParam(&result, pathPrefix, path); - Y_UNUSED(options); return result; } @@ -449,11 +448,10 @@ TNode SerializeParamsForSuspendOperation( TNode SerializeParamsForResumeOperation( const TOperationId& operationId, - const TResumeOperationOptions& options) + const TResumeOperationOptions& /*options*/) { TNode result; SetOperationIdParam(&result, operationId); - Y_UNUSED(options); return result; } @@ -863,9 +861,8 @@ TNode SerializeParamsForGetTabletInfos( const TString& pathPrefix, const TYPath& path, const TVector<int>& tabletIndexes, - const TGetTabletInfosOptions& options) + const TGetTabletInfosOptions& /*options*/) { - Y_UNUSED(options); TNode result; SetPathParam(&result, pathPrefix, path); result["tablet_indexes"] = TNode::CreateList(); diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h index 69a4888267..655198248c 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h @@ -1,5 +1,6 @@ #pragma once +#include <yt/cpp/mapreduce/common/helpers.h> #include <yt/cpp/mapreduce/interface/fwd.h> #include <yt/cpp/mapreduce/interface/client_method_options.h> @@ -228,6 +229,23 @@ TNode SerializeParamsForStartTransaction( TDuration txTimeout, const TStartTransactionOptions& options); +template <typename TOptions> +TNode SerializeTabletParams( + const TString& pathPrefix, + const TYPath& path, + const TOptions& options) +{ + TNode result; + result["path"] = AddPathPrefix(path, pathPrefix); + if (options.FirstTabletIndex_) { + result["first_tablet_index"] = *options.FirstTabletIndex_; + } + if (options.LastTabletIndex_) { + result["last_tablet_index"] = *options.LastTabletIndex_; + } + return result; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail::NRawClient |