aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-03-02 11:21:41 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-03-02 11:21:41 +0000
commit8322fb9ff849dc37fa752d5aba04ef9e7ba2a7c9 (patch)
tree0d722004f839a80a1c4a02aa4dd2704daae5adec /yt/cpp
parent22b98a26c01070ae980dc5477323d8d4152aabbc (diff)
parent6678165e016ba474f1b8dd6d49af92b0d46350b9 (diff)
downloadydb-8322fb9ff849dc37fa752d5aba04ef9e7ba2a7c9.tar.gz
Merge branch 'rightlib' into merge-libs-250302-1120
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp37
-rw-r--r--yt/cpp/mapreduce/client/client.h4
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp2
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp19
-rw-r--r--yt/cpp/mapreduce/client/skiff.cpp15
-rw-r--r--yt/cpp/mapreduce/client/skiff.h1
-rw-r--r--yt/cpp/mapreduce/client/structured_table_formats.cpp25
-rw-r--r--yt/cpp/mapreduce/client/structured_table_formats.h2
-rw-r--r--yt/cpp/mapreduce/http/context.cpp3
-rw-r--r--yt/cpp/mapreduce/http/context.h1
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.cpp5
-rw-r--r--yt/cpp/mapreduce/http_client/raw_client.h2
-rw-r--r--yt/cpp/mapreduce/http_client/raw_requests.h48
-rw-r--r--yt/cpp/mapreduce/interface/common.cpp22
-rw-r--r--yt/cpp/mapreduce/interface/common.h9
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h7
16 files changed, 173 insertions, 29 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 6e58903308..bf9936a64a 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -1496,8 +1496,7 @@ IClientPtr TClient::GetParentClient(bool ignoreGlobalTx)
RawClient_,
Context_,
TTransactionId(),
- ClientRetryPolicy_
- );
+ ClientRetryPolicy_);
} else {
return this;
}
@@ -1510,15 +1509,10 @@ void TClient::CheckShutdown() const
}
}
-TClientContext CreateClientContext(
- const TString& serverName,
- const TCreateClientOptions& options)
+void SetupClusterContext(
+ TClientContext& context,
+ const TString& serverName)
{
- TClientContext context;
- context.Config = options.Config_ ? options.Config_ : TConfig::Get();
- context.TvmOnly = options.TvmOnly_;
- context.ProxyAddress = options.ProxyAddress_;
-
context.ServerName = serverName;
ApplyProxyUrlAliasingRules(context.ServerName);
@@ -1531,9 +1525,8 @@ TClientContext CreateClientContext(
static constexpr char httpUrlSchema[] = "http://";
static constexpr char httpsUrlSchema[] = "https://";
- if (options.UseTLS_) {
- context.UseTLS = *options.UseTLS_;
- } else {
+
+ if (!context.UseTLS) {
context.UseTLS = context.ServerName.StartsWith(httpsUrlSchema);
}
@@ -1555,9 +1548,25 @@ TClientContext CreateClientContext(
if (context.ServerName.find(':') == TString::npos) {
context.ServerName = CreateHostNameWithPort(context.ServerName, context);
}
- if (options.TvmOnly_) {
+ if (context.TvmOnly) {
context.ServerName = Format("tvm.%v", context.ServerName);
}
+}
+
+TClientContext CreateClientContext(
+ const TString& serverName,
+ const TCreateClientOptions& options)
+{
+ TClientContext context;
+ context.Config = options.Config_ ? options.Config_ : TConfig::Get();
+ context.TvmOnly = options.TvmOnly_;
+ context.ProxyAddress = options.ProxyAddress_;
+
+ if (options.UseTLS_) {
+ context.UseTLS = *options.UseTLS_;
+ }
+
+ SetupClusterContext(context, serverName);
if (options.ProxyRole_) {
context.Config->Hosts = "hosts?role=" + *options.ProxyRole_;
diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h
index 9cd650bd2d..ef0741044c 100644
--- a/yt/cpp/mapreduce/client/client.h
+++ b/yt/cpp/mapreduce/client/client.h
@@ -509,6 +509,10 @@ private:
////////////////////////////////////////////////////////////////////////////////
+void SetupClusterContext(
+ TClientContext& context,
+ const TString& serverName);
+
TClientContext CreateClientContext(
const TString& serverName,
const TCreateClientOptions& options);
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index 61369859bc..37d39345f1 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -69,7 +69,7 @@ TClientReader::TClientReader(
if (useFormatFromTableAttributes) {
auto transactionId2 = ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_;
- auto newFormat = GetTableFormat(ClientRetryPolicy_, RawClient_, transactionId2, Path_);
+ auto newFormat = GetTableFormat(ClientRetryPolicy_, RawClient_, Context_, transactionId2, Path_);
if (newFormat) {
Format_->Config = *newFormat;
}
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index 76a55b436f..4d0cc07bed 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -220,11 +220,24 @@ TStructuredJobTableList ApplyProtobufColumnFilters(
return tableList;
}
- auto isDynamic = NRawClient::BatchTransform(
+ TVector<TRichYPath> tableListPaths;
+ for (const auto& table: tableList) {
+ Y_ABORT_UNLESS(table.RichYPath, "Cannot get path to apply column filters");
+ tableListPaths.emplace_back(*table.RichYPath);
+ }
+
+ auto isDynamic = NRawClient::RemoteClustersBatchTransform(
preparer.GetClient()->GetRawClient(),
- tableList,
+ preparer.GetContext(),
+ tableListPaths,
[&] (IRawBatchRequestPtr batch, const auto& table) {
- return batch->Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
+ // In case of external cluster, we can't use the current transaction
+ // since it is unknown for the external cluster.
+ // Hence, we should take a global transaction.
+ if (table.Cluster_ && !table.Cluster_->empty()) {
+ return batch->Get(TTransactionId(), table.Path_ + "/@dynamic", TGetOptions());
+ }
+ return batch->Get(preparer.GetTransactionId(), table.Path_ + "/@dynamic", TGetOptions());
});
auto newTableList = tableList;
diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp
index 537a28f059..e9c5ba2680 100644
--- a/yt/cpp/mapreduce/client/skiff.cpp
+++ b/yt/cpp/mapreduce/client/skiff.cpp
@@ -279,6 +279,7 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {
NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
const IRawClientPtr& rawClient,
+ const TClientContext& context,
const TTransactionId& transactionId,
ENodeReaderFormat nodeReaderFormat,
const TVector<TRichYPath>& tablePaths,
@@ -301,17 +302,23 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
}
}
- auto nodes = NRawClient::BatchTransform(
+ auto nodes = RemoteClustersBatchTransform(
rawClient,
- NRawClient::CanonizeYPaths(rawClient, tablePaths),
+ context,
+ tablePaths,
[&] (IRawBatchRequestPtr batch, const TRichYPath& path) {
auto getOptions = TGetOptions()
.AttributeFilter(
TAttributeFilter()
.AddAttribute("schema")
.AddAttribute("dynamic")
- .AddAttribute("type")
- );
+ .AddAttribute("type"));
+ // In case of external cluster, we can't use the current transaction
+ // since it is unknown for the external cluster.
+ // Hence, we should take a global transaction.
+ if (path.Cluster_ && !path.Cluster_->empty()) {
+ return batch->Get(TTransactionId(), path.Path_, getOptions);
+ }
return batch->Get(transactionId, path.Path_, getOptions);
});
diff --git a/yt/cpp/mapreduce/client/skiff.h b/yt/cpp/mapreduce/client/skiff.h
index 5f26dc7656..3d9189998d 100644
--- a/yt/cpp/mapreduce/client/skiff.h
+++ b/yt/cpp/mapreduce/client/skiff.h
@@ -60,6 +60,7 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema);
NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
const IRawClientPtr& rawClient,
+ const TClientContext& context,
const TTransactionId& transactionId,
ENodeReaderFormat nodeReaderFormat,
const TVector<TRichYPath>& tablePaths,
diff --git a/yt/cpp/mapreduce/client/structured_table_formats.cpp b/yt/cpp/mapreduce/client/structured_table_formats.cpp
index 5df2c40873..ea511398cc 100644
--- a/yt/cpp/mapreduce/client/structured_table_formats.cpp
+++ b/yt/cpp/mapreduce/client/structured_table_formats.cpp
@@ -1,5 +1,6 @@
#include "structured_table_formats.h"
+#include "client.h"
#include "format_hints.h"
#include "skiff.h"
@@ -67,15 +68,23 @@ TMaybe<TNode> GetCommonTableFormat(
TMaybe<TNode> GetTableFormat(
const IClientRetryPolicyPtr& retryPolicy,
const IRawClientPtr& rawClient,
+ const TClientContext& context,
const TTransactionId& transactionId,
const TRichYPath& path)
{
+ auto newRawClient = rawClient;
+ if (path.Cluster_ && !path.Cluster_->empty()) {
+ auto newContext = context;
+ NDetail::SetupClusterContext(newContext, *path.Cluster_);
+ newRawClient = rawClient->Clone(newContext);
+ }
+
auto formatPath = path.Path_ + "/@_format";
auto exists = NDetail::RequestWithRetry<bool>(
retryPolicy->CreatePolicyForGenericRequest(),
- [&rawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) {
- return rawClient->Exists(transactionId, formatPath);
+ [&newRawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) {
+ return newRawClient->Exists(transactionId, formatPath);
});
if (!exists) {
return TMaybe<TNode>();
@@ -83,8 +92,8 @@ TMaybe<TNode> GetTableFormat(
auto format = NDetail::RequestWithRetry<TMaybe<TNode>>(
retryPolicy->CreatePolicyForGenericRequest(),
- [&rawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) {
- return rawClient->Get(transactionId, formatPath);
+ [&newRawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) {
+ return newRawClient->Get(transactionId, formatPath);
});
if (format.Get()->AsString() != "yamred_dsv") {
return TMaybe<TNode>();
@@ -102,12 +111,13 @@ TMaybe<TNode> GetTableFormat(
TMaybe<TNode> GetTableFormats(
const IClientRetryPolicyPtr& clientRetryPolicy,
const IRawClientPtr& rawClient,
+ const TClientContext& context,
const TTransactionId& transactionId,
const TVector<TRichYPath>& inputs)
{
TVector<TMaybe<TNode>> formats;
for (auto& table : inputs) {
- formats.push_back(GetTableFormat(clientRetryPolicy, rawClient, transactionId, table));
+ formats.push_back(GetTableFormat(clientRetryPolicy, rawClient, context, transactionId, table));
}
return GetCommonTableFormat(formats);
@@ -121,6 +131,7 @@ namespace NDetail {
NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
const IRawClientPtr& rawClient,
+ const TClientContext& context,
const TTransactionId& transactionId,
const TVector<TRichYPath>& tables,
const TOperationOptions& options,
@@ -134,6 +145,7 @@ NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
}
return CreateSkiffSchemaIfNecessary(
rawClient,
+ context,
transactionId,
nodeReaderFormat,
tables,
@@ -387,7 +399,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateYamrFormat(
Y_ABORT_UNLESS(table.RichYPath, "Cannot use format from table for intermediate table");
tableList.push_back(*table.RichYPath);
}
- formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, RawClient_, TransactionId_, tableList);
+ formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, RawClient_, Context_, TransactionId_, tableList);
}
if (formatFromTableAttributes) {
return {
@@ -432,6 +444,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat(
}
skiffSchema = TryCreateSkiffSchema(
RawClient_,
+ Context_,
TransactionId_,
tableList,
OperationOptions_,
diff --git a/yt/cpp/mapreduce/client/structured_table_formats.h b/yt/cpp/mapreduce/client/structured_table_formats.h
index 64a44c6f4d..c1ceec4066 100644
--- a/yt/cpp/mapreduce/client/structured_table_formats.h
+++ b/yt/cpp/mapreduce/client/structured_table_formats.h
@@ -22,12 +22,14 @@ TMaybe<TNode> GetCommonTableFormat(
TMaybe<TNode> GetTableFormat(
const IClientRetryPolicyPtr& clientRetryPolicy,
const IRawClientPtr& rawClient,
+ const TClientContext& context,
const TTransactionId& transactionId,
const TRichYPath& path);
TMaybe<TNode> GetTableFormats(
const IClientRetryPolicyPtr& clientRetryPolicy,
const IRawClientPtr& rawClient,
+ const TClientContext& context,
const TTransactionId& transactionId,
const TVector<TRichYPath>& paths);
diff --git a/yt/cpp/mapreduce/http/context.cpp b/yt/cpp/mapreduce/http/context.cpp
index 18d564fe09..531950c39e 100644
--- a/yt/cpp/mapreduce/http/context.cpp
+++ b/yt/cpp/mapreduce/http/context.cpp
@@ -13,7 +13,8 @@ bool operator==(const TClientContext& lhs, const TClientContext& rhs)
lhs.HttpClient == rhs.HttpClient &&
lhs.UseTLS == rhs.UseTLS &&
lhs.TvmOnly == rhs.TvmOnly &&
- lhs.ProxyAddress == rhs.ProxyAddress;
+ lhs.ProxyAddress == rhs.ProxyAddress &&
+ lhs.ProxyRole == rhs.ProxyRole;
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/context.h b/yt/cpp/mapreduce/http/context.h
index f50c1b9732..4559cfd587 100644
--- a/yt/cpp/mapreduce/http/context.h
+++ b/yt/cpp/mapreduce/http/context.h
@@ -22,6 +22,7 @@ struct TClientContext
bool UseTLS = false;
TConfigPtr Config = TConfig::Get();
TMaybe<TString> ProxyAddress;
+ TMaybe<TString> ProxyRole;
};
bool operator==(const TClientContext& lhs, const TClientContext& rhs);
diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp
index c86fd2494d..734282c2cf 100644
--- a/yt/cpp/mapreduce/http_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/http_client/raw_client.cpp
@@ -903,6 +903,11 @@ IRawClientPtr THttpRawClient::Clone()
return ::MakeIntrusive<THttpRawClient>(Context_);
}
+IRawClientPtr THttpRawClient::Clone(const TClientContext& context)
+{
+ return ::MakeIntrusive<THttpRawClient>(context);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/http_client/raw_client.h b/yt/cpp/mapreduce/http_client/raw_client.h
index 6aa670f524..d292688978 100644
--- a/yt/cpp/mapreduce/http_client/raw_client.h
+++ b/yt/cpp/mapreduce/http_client/raw_client.h
@@ -339,6 +339,8 @@ public:
IRawClientPtr Clone() override;
+ IRawClientPtr Clone(const TClientContext& context) override;
+
private:
const TClientContext Context_;
};
diff --git a/yt/cpp/mapreduce/http_client/raw_requests.h b/yt/cpp/mapreduce/http_client/raw_requests.h
index 7273b8ad95..a61f303a62 100644
--- a/yt/cpp/mapreduce/http_client/raw_requests.h
+++ b/yt/cpp/mapreduce/http_client/raw_requests.h
@@ -4,6 +4,8 @@
#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/client/client.h>
+
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/interface/client.h>
@@ -91,6 +93,52 @@ auto BatchTransform(
return result;
}
+template <typename TBatchAdder>
+auto RemoteClustersBatchTransform(
+ const IRawClientPtr& rawClient,
+ const TClientContext& context,
+ const TVector<TRichYPath>& paths,
+ TBatchAdder batchAdder,
+ const TExecuteBatchOptions& options = {})
+{
+ // Given inputs from multiple clusters, we need to categorize them based on their cluster names.
+ // We assume the current cluster name is empty. Within each cluster,
+ // gather all related inputs to perform a batch request.
+ std::unordered_map<TString, std::vector<int>> clusterToPathsIndexes;
+ for (ssize_t index = 0; index < std::ssize(paths); ++index) {
+ const auto& path = paths[index];
+ auto clusterName = path.Cluster_.GetOrElse("");
+ clusterToPathsIndexes[clusterName].push_back(index);
+ }
+
+ std::vector<TNode> result(paths.size());
+ for (const auto& [clusterName, pathsIndexes] : clusterToPathsIndexes) {
+ auto newContext = context;
+ if (!clusterName.empty()) {
+ // It is not a current cluster, we switch the cluster context.
+ SetupClusterContext(newContext, clusterName);
+ }
+ auto newRawClient = rawClient->Clone(newContext);
+
+ TVector<TRichYPath> pathsBatch;
+ pathsBatch.reserve(pathsIndexes.size());
+ for (int index : pathsIndexes) {
+ pathsBatch.push_back(paths[index]);
+ }
+
+ auto batchResult = NRawClient::BatchTransform(
+ newRawClient,
+ NRawClient::CanonizeYPaths(newRawClient, pathsBatch),
+ batchAdder,
+ options);
+
+ for (ssize_t index = 0; index < std::ssize(pathsIndexes); ++index) {
+ result[pathsIndexes[index]] = batchResult[index];
+ }
+ }
+ return result;
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NDetail::NRawClient
diff --git a/yt/cpp/mapreduce/interface/common.cpp b/yt/cpp/mapreduce/interface/common.cpp
index 966be8341f..7abbef9127 100644
--- a/yt/cpp/mapreduce/interface/common.cpp
+++ b/yt/cpp/mapreduce/interface/common.cpp
@@ -552,6 +552,28 @@ TKeyBound::TKeyBound(ERelation relation, TKey key)
, Key_(std::move(key))
{ }
+bool operator==(const TKeyBound& lhs, const TKeyBound& rhs) noexcept
+{
+ return lhs.Key() == rhs.Key() && lhs.Relation() == rhs.Relation();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+bool operator==(const TReadLimit& lhs, const TReadLimit& rhs) noexcept
+{
+ return lhs.Key_ == rhs.Key_ && lhs.RowIndex_ == rhs.RowIndex_ &&
+ lhs.Offset_ == rhs.Offset_ && lhs.TabletIndex_ == rhs.TabletIndex_ &&
+ lhs.KeyBound_ == rhs.KeyBound_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+bool operator==(const TReadRange& lhs, const TReadRange& rhs) noexcept
+{
+ return lhs.LowerLimit_ == rhs.LowerLimit_ &&
+ lhs.UpperLimit_ == rhs.UpperLimit_ && lhs.Exact_ == rhs.Exact_;
+}
+
////////////////////////////////////////////////////////////////////////////////
TTableSchema CreateTableSchema(
diff --git a/yt/cpp/mapreduce/interface/common.h b/yt/cpp/mapreduce/interface/common.h
index d595302bbb..9752e15822 100644
--- a/yt/cpp/mapreduce/interface/common.h
+++ b/yt/cpp/mapreduce/interface/common.h
@@ -880,6 +880,9 @@ struct TKeyBound
/// @endcond
};
+/// Equality check checks all fields of TKeyBound
+bool operator==(const TKeyBound& lhs, const TKeyBound& rhs) noexcept;
+
///
/// @brief Description of the read limit.
///
@@ -923,6 +926,9 @@ struct TReadLimit
FLUENT_FIELD_OPTION(i64, TabletIndex);
};
+/// Equality check checks all fields of TReadLimit
+bool operator==(const TReadLimit& lhs, const TReadLimit& rhs) noexcept;
+
///
/// @brief Range of a table or a file
///
@@ -963,6 +969,9 @@ struct TReadRange
}
};
+/// Equality check checks all fields of TReadRange
+bool operator==(const TReadRange& lhs, const TReadRange& rhs) noexcept;
+
///
/// @brief Path with additional attributes.
///
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index b97c06ce37..24f8de61b6 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -4,6 +4,8 @@
#include "client_method_options.h"
#include "operation.h"
+#include <yt/cpp/mapreduce/http/context.h>
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -198,6 +200,7 @@ public:
const TGetJobTraceOptions& options = {}) = 0;
// Files
+
virtual std::unique_ptr<IInputStream> ReadFile(
const TTransactionId& transactionId,
const TRichYPath& path,
@@ -332,7 +335,11 @@ public:
virtual IRawBatchRequestPtr CreateRawBatchRequest() = 0;
+ // Other
+
virtual IRawClientPtr Clone() = 0;
+
+ virtual IRawClientPtr Clone(const TClientContext& context) = 0;
};
////////////////////////////////////////////////////////////////////////////////