aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-02-21 15:11:28 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-02-21 15:22:02 +0300
commit8501f43e6399db4ced00b6e9664455ca5a5dace9 (patch)
treec4b6f08d53752b641a8f3bc9a39fc4940aef4b52
parentfab4d0bc686cd0247381ec1ef0b83a826f7935fe (diff)
downloadydb-8501f43e6399db4ced00b6e9664455ca5a5dace9.tar.gz
Intermediate changes
-rw-r--r--yt/yt/client/transaction_client/config.cpp9
-rw-r--r--yt/yt/client/transaction_client/config.h20
-rw-r--r--yt/yt/client/transaction_client/remote_timestamp_provider.cpp48
-rw-r--r--yt/yt/client/transaction_client/remote_timestamp_provider.h14
-rw-r--r--yt/yt_proto/yt/client/transaction_client/proto/timestamp_service.proto1
5 files changed, 92 insertions, 0 deletions
diff --git a/yt/yt/client/transaction_client/config.cpp b/yt/yt/client/transaction_client/config.cpp
index 3264e5dcd1..165be900a3 100644
--- a/yt/yt/client/transaction_client/config.cpp
+++ b/yt/yt/client/transaction_client/config.cpp
@@ -31,4 +31,13 @@ void TRemoteTimestampProviderConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TAlienTimestampProviderConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("clock_cluster_tag", &TThis::ClockClusterTag);
+ registrar.Parameter("timestamp_provider", &TThis::TimestampProvider)
+ .DefaultNew();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NTransactionClient
diff --git a/yt/yt/client/transaction_client/config.h b/yt/yt/client/transaction_client/config.h
index 9c8c69d6a6..97b291de0c 100644
--- a/yt/yt/client/transaction_client/config.h
+++ b/yt/yt/client/transaction_client/config.h
@@ -38,4 +38,24 @@ DEFINE_REFCOUNTED_TYPE(TRemoteTimestampProviderConfig)
////////////////////////////////////////////////////////////////////////////////
+class TAlienTimestampProviderConfig
+ : public NYTree::TYsonStruct
+{
+public:
+ //! Clock server cell tag
+ NObjectClient::TCellTag ClockClusterTag;
+
+ NTransactionClient::TRemoteTimestampProviderConfigPtr TimestampProvider;
+
+ REGISTER_YSON_STRUCT(TAlienTimestampProviderConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TAlienTimestampProviderConfig)
+
+DECLARE_REFCOUNTED_CLASS(TAlienTimestampProviderConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NTransactionClient
diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
index 045620a75e..f1e6107a11 100644
--- a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
+++ b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
@@ -22,6 +22,10 @@ using namespace NConcurrency;
////////////////////////////////////////////////////////////////////////////////
+static const auto& Logger = TransactionClientLogger;
+
+////////////////////////////////////////////////////////////////////////////////
+
IChannelPtr CreateTimestampProviderChannel(
TRemoteTimestampProviderConfigPtr config,
IChannelFactoryPtr channelFactory)
@@ -105,6 +109,50 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
config->BatchPeriod);
}
+ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
+ const TRemoteTimestampProviderConfigPtr& config,
+ const IChannelFactoryPtr& channelFactory)
+{
+ return CreateBatchingRemoteTimestampProvider(
+ config,
+ CreateTimestampProviderChannel(config, channelFactory));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap(
+ const std::vector<TAlienTimestampProviderConfigPtr>& configs,
+ ITimestampProviderPtr nativeProvider,
+ TCellTag nativeProviderClockClusterTag,
+ const IChannelFactoryPtr& channelFactory)
+{
+ TAlienRemoteTimestampProvidersMap alienProvidersMap;
+
+ if (nativeProviderClockClusterTag == InvalidCellTag) {
+ return alienProvidersMap;
+ }
+
+ alienProvidersMap.reserve(configs.size() + 1);
+ EmplaceOrCrash(alienProvidersMap, nativeProviderClockClusterTag, std::move(nativeProvider));
+
+ for (const auto& foreignProviderConfig : configs) {
+ auto alienClockCellTag = foreignProviderConfig->ClockClusterTag;
+
+ if(alienProvidersMap.contains(alienClockCellTag)) {
+ YT_LOG_ALERT("Duplicate entry for alien clock clusters %d",
+ alienClockCellTag);
+ continue;
+ }
+
+ EmplaceOrCrash(
+ alienProvidersMap,
+ alienClockCellTag,
+ CreateBatchingRemoteTimestampProvider(foreignProviderConfig->TimestampProvider, channelFactory));
+ }
+
+ return alienProvidersMap;
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NTransactionClient
diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.h b/yt/yt/client/transaction_client/remote_timestamp_provider.h
index 91a7179e92..41a4a3d707 100644
--- a/yt/yt/client/transaction_client/remote_timestamp_provider.h
+++ b/yt/yt/client/transaction_client/remote_timestamp_provider.h
@@ -2,12 +2,16 @@
#include "public.h"
+#include <yt/yt/client/transaction_client/config.h>
+
#include <yt/yt/core/rpc/public.h>
namespace NYT::NTransactionClient {
////////////////////////////////////////////////////////////////////////////////
+using TAlienRemoteTimestampProvidersMap = THashMap<NObjectClient::TCellTag, ITimestampProviderPtr>;
+
NRpc::IChannelPtr CreateTimestampProviderChannel(
TRemoteTimestampProviderConfigPtr config,
NRpc::IChannelFactoryPtr channelFactory);
@@ -29,6 +33,16 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
TRemoteTimestampProviderConfigPtr config,
NRpc::IChannelPtr channel);
+ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
+ const TRemoteTimestampProviderConfigPtr& config,
+ const NRpc::IChannelFactoryPtr& channelFactory);
+
+TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap(
+ const std::vector<TAlienTimestampProviderConfigPtr>& configs,
+ ITimestampProviderPtr nativeProvider,
+ NObjectClient::TCellTag nativeProviderClockClusterTag,
+ const NRpc::IChannelFactoryPtr& channelFactory);
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NTransactionClient
diff --git a/yt/yt_proto/yt/client/transaction_client/proto/timestamp_service.proto b/yt/yt_proto/yt/client/transaction_client/proto/timestamp_service.proto
index 792e964de3..7bc9ec79a8 100644
--- a/yt/yt_proto/yt/client/transaction_client/proto/timestamp_service.proto
+++ b/yt/yt_proto/yt/client/transaction_client/proto/timestamp_service.proto
@@ -5,6 +5,7 @@ package NYT.NTransactionClient.NProto;
message TReqGenerateTimestamps
{
optional int32 count = 1 [default = 1];
+ optional int32 clock_cluster_tag = 2;
}
message TRspGenerateTimestamps