diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-03-02 11:21:41 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-03-02 11:21:41 +0000 |
commit | 8322fb9ff849dc37fa752d5aba04ef9e7ba2a7c9 (patch) | |
tree | 0d722004f839a80a1c4a02aa4dd2704daae5adec /yt/cpp | |
parent | 22b98a26c01070ae980dc5477323d8d4152aabbc (diff) | |
parent | 6678165e016ba474f1b8dd6d49af92b0d46350b9 (diff) | |
download | ydb-8322fb9ff849dc37fa752d5aba04ef9e7ba2a7c9.tar.gz |
Merge branch 'rightlib' into merge-libs-250302-1120
Diffstat (limited to 'yt/cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 37 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client_reader.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 19 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/skiff.cpp | 15 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/skiff.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/structured_table_formats.cpp | 25 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/structured_table_formats.h | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/context.cpp | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/context.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http_client/raw_client.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http_client/raw_client.h | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http_client/raw_requests.h | 48 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/common.cpp | 22 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/common.h | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 7 |
16 files changed, 173 insertions, 29 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 6e58903308..bf9936a64a 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -1496,8 +1496,7 @@ IClientPtr TClient::GetParentClient(bool ignoreGlobalTx) RawClient_, Context_, TTransactionId(), - ClientRetryPolicy_ - ); + ClientRetryPolicy_); } else { return this; } @@ -1510,15 +1509,10 @@ void TClient::CheckShutdown() const } } -TClientContext CreateClientContext( - const TString& serverName, - const TCreateClientOptions& options) +void SetupClusterContext( + TClientContext& context, + const TString& serverName) { - TClientContext context; - context.Config = options.Config_ ? options.Config_ : TConfig::Get(); - context.TvmOnly = options.TvmOnly_; - context.ProxyAddress = options.ProxyAddress_; - context.ServerName = serverName; ApplyProxyUrlAliasingRules(context.ServerName); @@ -1531,9 +1525,8 @@ TClientContext CreateClientContext( static constexpr char httpUrlSchema[] = "http://"; static constexpr char httpsUrlSchema[] = "https://"; - if (options.UseTLS_) { - context.UseTLS = *options.UseTLS_; - } else { + + if (!context.UseTLS) { context.UseTLS = context.ServerName.StartsWith(httpsUrlSchema); } @@ -1555,9 +1548,25 @@ TClientContext CreateClientContext( if (context.ServerName.find(':') == TString::npos) { context.ServerName = CreateHostNameWithPort(context.ServerName, context); } - if (options.TvmOnly_) { + if (context.TvmOnly) { context.ServerName = Format("tvm.%v", context.ServerName); } +} + +TClientContext CreateClientContext( + const TString& serverName, + const TCreateClientOptions& options) +{ + TClientContext context; + context.Config = options.Config_ ? options.Config_ : TConfig::Get(); + context.TvmOnly = options.TvmOnly_; + context.ProxyAddress = options.ProxyAddress_; + + if (options.UseTLS_) { + context.UseTLS = *options.UseTLS_; + } + + SetupClusterContext(context, serverName); if (options.ProxyRole_) { context.Config->Hosts = "hosts?role=" + *options.ProxyRole_; diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h index 9cd650bd2d..ef0741044c 100644 --- a/yt/cpp/mapreduce/client/client.h +++ b/yt/cpp/mapreduce/client/client.h @@ -509,6 +509,10 @@ private: //////////////////////////////////////////////////////////////////////////////// +void SetupClusterContext( + TClientContext& context, + const TString& serverName); + TClientContext CreateClientContext( const TString& serverName, const TCreateClientOptions& options); diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index 61369859bc..37d39345f1 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -69,7 +69,7 @@ TClientReader::TClientReader( if (useFormatFromTableAttributes) { auto transactionId2 = ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_; - auto newFormat = GetTableFormat(ClientRetryPolicy_, RawClient_, transactionId2, Path_); + auto newFormat = GetTableFormat(ClientRetryPolicy_, RawClient_, Context_, transactionId2, Path_); if (newFormat) { Format_->Config = *newFormat; } diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 76a55b436f..4d0cc07bed 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -220,11 +220,24 @@ TStructuredJobTableList ApplyProtobufColumnFilters( return tableList; } - auto isDynamic = NRawClient::BatchTransform( + TVector<TRichYPath> tableListPaths; + for (const auto& table: tableList) { + Y_ABORT_UNLESS(table.RichYPath, "Cannot get path to apply column filters"); + tableListPaths.emplace_back(*table.RichYPath); + } + + auto isDynamic = NRawClient::RemoteClustersBatchTransform( preparer.GetClient()->GetRawClient(), - tableList, + preparer.GetContext(), + tableListPaths, [&] (IRawBatchRequestPtr batch, const auto& table) { - return batch->Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions()); + // In case of external cluster, we can't use the current transaction + // since it is unknown for the external cluster. + // Hence, we should take a global transaction. + if (table.Cluster_ && !table.Cluster_->empty()) { + return batch->Get(TTransactionId(), table.Path_ + "/@dynamic", TGetOptions()); + } + return batch->Get(preparer.GetTransactionId(), table.Path_ + "/@dynamic", TGetOptions()); }); auto newTableList = tableList; diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp index 537a28f059..e9c5ba2680 100644 --- a/yt/cpp/mapreduce/client/skiff.cpp +++ b/yt/cpp/mapreduce/client/skiff.cpp @@ -279,6 +279,7 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) { NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( const IRawClientPtr& rawClient, + const TClientContext& context, const TTransactionId& transactionId, ENodeReaderFormat nodeReaderFormat, const TVector<TRichYPath>& tablePaths, @@ -301,17 +302,23 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( } } - auto nodes = NRawClient::BatchTransform( + auto nodes = RemoteClustersBatchTransform( rawClient, - NRawClient::CanonizeYPaths(rawClient, tablePaths), + context, + tablePaths, [&] (IRawBatchRequestPtr batch, const TRichYPath& path) { auto getOptions = TGetOptions() .AttributeFilter( TAttributeFilter() .AddAttribute("schema") .AddAttribute("dynamic") - .AddAttribute("type") - ); + .AddAttribute("type")); + // In case of external cluster, we can't use the current transaction + // since it is unknown for the external cluster. + // Hence, we should take a global transaction. + if (path.Cluster_ && !path.Cluster_->empty()) { + return batch->Get(TTransactionId(), path.Path_, getOptions); + } return batch->Get(transactionId, path.Path_, getOptions); }); diff --git a/yt/cpp/mapreduce/client/skiff.h b/yt/cpp/mapreduce/client/skiff.h index 5f26dc7656..3d9189998d 100644 --- a/yt/cpp/mapreduce/client/skiff.h +++ b/yt/cpp/mapreduce/client/skiff.h @@ -60,6 +60,7 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema); NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( const IRawClientPtr& rawClient, + const TClientContext& context, const TTransactionId& transactionId, ENodeReaderFormat nodeReaderFormat, const TVector<TRichYPath>& tablePaths, diff --git a/yt/cpp/mapreduce/client/structured_table_formats.cpp b/yt/cpp/mapreduce/client/structured_table_formats.cpp index 5df2c40873..ea511398cc 100644 --- a/yt/cpp/mapreduce/client/structured_table_formats.cpp +++ b/yt/cpp/mapreduce/client/structured_table_formats.cpp @@ -1,5 +1,6 @@ #include "structured_table_formats.h" +#include "client.h" #include "format_hints.h" #include "skiff.h" @@ -67,15 +68,23 @@ TMaybe<TNode> GetCommonTableFormat( TMaybe<TNode> GetTableFormat( const IClientRetryPolicyPtr& retryPolicy, const IRawClientPtr& rawClient, + const TClientContext& context, const TTransactionId& transactionId, const TRichYPath& path) { + auto newRawClient = rawClient; + if (path.Cluster_ && !path.Cluster_->empty()) { + auto newContext = context; + NDetail::SetupClusterContext(newContext, *path.Cluster_); + newRawClient = rawClient->Clone(newContext); + } + auto formatPath = path.Path_ + "/@_format"; auto exists = NDetail::RequestWithRetry<bool>( retryPolicy->CreatePolicyForGenericRequest(), - [&rawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) { - return rawClient->Exists(transactionId, formatPath); + [&newRawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) { + return newRawClient->Exists(transactionId, formatPath); }); if (!exists) { return TMaybe<TNode>(); @@ -83,8 +92,8 @@ TMaybe<TNode> GetTableFormat( auto format = NDetail::RequestWithRetry<TMaybe<TNode>>( retryPolicy->CreatePolicyForGenericRequest(), - [&rawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) { - return rawClient->Get(transactionId, formatPath); + [&newRawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) { + return newRawClient->Get(transactionId, formatPath); }); if (format.Get()->AsString() != "yamred_dsv") { return TMaybe<TNode>(); @@ -102,12 +111,13 @@ TMaybe<TNode> GetTableFormat( TMaybe<TNode> GetTableFormats( const IClientRetryPolicyPtr& clientRetryPolicy, const IRawClientPtr& rawClient, + const TClientContext& context, const TTransactionId& transactionId, const TVector<TRichYPath>& inputs) { TVector<TMaybe<TNode>> formats; for (auto& table : inputs) { - formats.push_back(GetTableFormat(clientRetryPolicy, rawClient, transactionId, table)); + formats.push_back(GetTableFormat(clientRetryPolicy, rawClient, context, transactionId, table)); } return GetCommonTableFormat(formats); @@ -121,6 +131,7 @@ namespace NDetail { NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema( const IRawClientPtr& rawClient, + const TClientContext& context, const TTransactionId& transactionId, const TVector<TRichYPath>& tables, const TOperationOptions& options, @@ -134,6 +145,7 @@ NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema( } return CreateSkiffSchemaIfNecessary( rawClient, + context, transactionId, nodeReaderFormat, tables, @@ -387,7 +399,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateYamrFormat( Y_ABORT_UNLESS(table.RichYPath, "Cannot use format from table for intermediate table"); tableList.push_back(*table.RichYPath); } - formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, RawClient_, TransactionId_, tableList); + formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, RawClient_, Context_, TransactionId_, tableList); } if (formatFromTableAttributes) { return { @@ -432,6 +444,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat( } skiffSchema = TryCreateSkiffSchema( RawClient_, + Context_, TransactionId_, tableList, OperationOptions_, diff --git a/yt/cpp/mapreduce/client/structured_table_formats.h b/yt/cpp/mapreduce/client/structured_table_formats.h index 64a44c6f4d..c1ceec4066 100644 --- a/yt/cpp/mapreduce/client/structured_table_formats.h +++ b/yt/cpp/mapreduce/client/structured_table_formats.h @@ -22,12 +22,14 @@ TMaybe<TNode> GetCommonTableFormat( TMaybe<TNode> GetTableFormat( const IClientRetryPolicyPtr& clientRetryPolicy, const IRawClientPtr& rawClient, + const TClientContext& context, const TTransactionId& transactionId, const TRichYPath& path); TMaybe<TNode> GetTableFormats( const IClientRetryPolicyPtr& clientRetryPolicy, const IRawClientPtr& rawClient, + const TClientContext& context, const TTransactionId& transactionId, const TVector<TRichYPath>& paths); diff --git a/yt/cpp/mapreduce/http/context.cpp b/yt/cpp/mapreduce/http/context.cpp index 18d564fe09..531950c39e 100644 --- a/yt/cpp/mapreduce/http/context.cpp +++ b/yt/cpp/mapreduce/http/context.cpp @@ -13,7 +13,8 @@ bool operator==(const TClientContext& lhs, const TClientContext& rhs) lhs.HttpClient == rhs.HttpClient && lhs.UseTLS == rhs.UseTLS && lhs.TvmOnly == rhs.TvmOnly && - lhs.ProxyAddress == rhs.ProxyAddress; + lhs.ProxyAddress == rhs.ProxyAddress && + lhs.ProxyRole == rhs.ProxyRole; } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/context.h b/yt/cpp/mapreduce/http/context.h index f50c1b9732..4559cfd587 100644 --- a/yt/cpp/mapreduce/http/context.h +++ b/yt/cpp/mapreduce/http/context.h @@ -22,6 +22,7 @@ struct TClientContext bool UseTLS = false; TConfigPtr Config = TConfig::Get(); TMaybe<TString> ProxyAddress; + TMaybe<TString> ProxyRole; }; bool operator==(const TClientContext& lhs, const TClientContext& rhs); diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp index c86fd2494d..734282c2cf 100644 --- a/yt/cpp/mapreduce/http_client/raw_client.cpp +++ b/yt/cpp/mapreduce/http_client/raw_client.cpp @@ -903,6 +903,11 @@ IRawClientPtr THttpRawClient::Clone() return ::MakeIntrusive<THttpRawClient>(Context_); } +IRawClientPtr THttpRawClient::Clone(const TClientContext& context) +{ + return ::MakeIntrusive<THttpRawClient>(context); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/http_client/raw_client.h b/yt/cpp/mapreduce/http_client/raw_client.h index 6aa670f524..d292688978 100644 --- a/yt/cpp/mapreduce/http_client/raw_client.h +++ b/yt/cpp/mapreduce/http_client/raw_client.h @@ -339,6 +339,8 @@ public: IRawClientPtr Clone() override; + IRawClientPtr Clone(const TClientContext& context) override; + private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/http_client/raw_requests.h b/yt/cpp/mapreduce/http_client/raw_requests.h index 7273b8ad95..a61f303a62 100644 --- a/yt/cpp/mapreduce/http_client/raw_requests.h +++ b/yt/cpp/mapreduce/http_client/raw_requests.h @@ -4,6 +4,8 @@ #include <yt/cpp/mapreduce/common/fwd.h> +#include <yt/cpp/mapreduce/client/client.h> + #include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/interface/client.h> @@ -91,6 +93,52 @@ auto BatchTransform( return result; } +template <typename TBatchAdder> +auto RemoteClustersBatchTransform( + const IRawClientPtr& rawClient, + const TClientContext& context, + const TVector<TRichYPath>& paths, + TBatchAdder batchAdder, + const TExecuteBatchOptions& options = {}) +{ + // Given inputs from multiple clusters, we need to categorize them based on their cluster names. + // We assume the current cluster name is empty. Within each cluster, + // gather all related inputs to perform a batch request. + std::unordered_map<TString, std::vector<int>> clusterToPathsIndexes; + for (ssize_t index = 0; index < std::ssize(paths); ++index) { + const auto& path = paths[index]; + auto clusterName = path.Cluster_.GetOrElse(""); + clusterToPathsIndexes[clusterName].push_back(index); + } + + std::vector<TNode> result(paths.size()); + for (const auto& [clusterName, pathsIndexes] : clusterToPathsIndexes) { + auto newContext = context; + if (!clusterName.empty()) { + // It is not a current cluster, we switch the cluster context. + SetupClusterContext(newContext, clusterName); + } + auto newRawClient = rawClient->Clone(newContext); + + TVector<TRichYPath> pathsBatch; + pathsBatch.reserve(pathsIndexes.size()); + for (int index : pathsIndexes) { + pathsBatch.push_back(paths[index]); + } + + auto batchResult = NRawClient::BatchTransform( + newRawClient, + NRawClient::CanonizeYPaths(newRawClient, pathsBatch), + batchAdder, + options); + + for (ssize_t index = 0; index < std::ssize(pathsIndexes); ++index) { + result[pathsIndexes[index]] = batchResult[index]; + } + } + return result; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NDetail::NRawClient diff --git a/yt/cpp/mapreduce/interface/common.cpp b/yt/cpp/mapreduce/interface/common.cpp index 966be8341f..7abbef9127 100644 --- a/yt/cpp/mapreduce/interface/common.cpp +++ b/yt/cpp/mapreduce/interface/common.cpp @@ -552,6 +552,28 @@ TKeyBound::TKeyBound(ERelation relation, TKey key) , Key_(std::move(key)) { } +bool operator==(const TKeyBound& lhs, const TKeyBound& rhs) noexcept +{ + return lhs.Key() == rhs.Key() && lhs.Relation() == rhs.Relation(); +} + +//////////////////////////////////////////////////////////////////////////////// + +bool operator==(const TReadLimit& lhs, const TReadLimit& rhs) noexcept +{ + return lhs.Key_ == rhs.Key_ && lhs.RowIndex_ == rhs.RowIndex_ && + lhs.Offset_ == rhs.Offset_ && lhs.TabletIndex_ == rhs.TabletIndex_ && + lhs.KeyBound_ == rhs.KeyBound_; +} + +//////////////////////////////////////////////////////////////////////////////// + +bool operator==(const TReadRange& lhs, const TReadRange& rhs) noexcept +{ + return lhs.LowerLimit_ == rhs.LowerLimit_ && + lhs.UpperLimit_ == rhs.UpperLimit_ && lhs.Exact_ == rhs.Exact_; +} + //////////////////////////////////////////////////////////////////////////////// TTableSchema CreateTableSchema( diff --git a/yt/cpp/mapreduce/interface/common.h b/yt/cpp/mapreduce/interface/common.h index d595302bbb..9752e15822 100644 --- a/yt/cpp/mapreduce/interface/common.h +++ b/yt/cpp/mapreduce/interface/common.h @@ -880,6 +880,9 @@ struct TKeyBound /// @endcond }; +/// Equality check checks all fields of TKeyBound +bool operator==(const TKeyBound& lhs, const TKeyBound& rhs) noexcept; + /// /// @brief Description of the read limit. /// @@ -923,6 +926,9 @@ struct TReadLimit FLUENT_FIELD_OPTION(i64, TabletIndex); }; +/// Equality check checks all fields of TReadLimit +bool operator==(const TReadLimit& lhs, const TReadLimit& rhs) noexcept; + /// /// @brief Range of a table or a file /// @@ -963,6 +969,9 @@ struct TReadRange } }; +/// Equality check checks all fields of TReadRange +bool operator==(const TReadRange& lhs, const TReadRange& rhs) noexcept; + /// /// @brief Path with additional attributes. /// diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index b97c06ce37..24f8de61b6 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -4,6 +4,8 @@ #include "client_method_options.h" #include "operation.h" +#include <yt/cpp/mapreduce/http/context.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -198,6 +200,7 @@ public: const TGetJobTraceOptions& options = {}) = 0; // Files + virtual std::unique_ptr<IInputStream> ReadFile( const TTransactionId& transactionId, const TRichYPath& path, @@ -332,7 +335,11 @@ public: virtual IRawBatchRequestPtr CreateRawBatchRequest() = 0; + // Other + virtual IRawClientPtr Clone() = 0; + + virtual IRawClientPtr Clone(const TClientContext& context) = 0; }; //////////////////////////////////////////////////////////////////////////////// |