diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-02-21 15:11:28 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-02-21 15:22:02 +0300 |
commit | 8501f43e6399db4ced00b6e9664455ca5a5dace9 (patch) | |
tree | c4b6f08d53752b641a8f3bc9a39fc4940aef4b52 | |
parent | fab4d0bc686cd0247381ec1ef0b83a826f7935fe (diff) | |
download | ydb-8501f43e6399db4ced00b6e9664455ca5a5dace9.tar.gz |
Intermediate changes
5 files changed, 92 insertions, 0 deletions
diff --git a/yt/yt/client/transaction_client/config.cpp b/yt/yt/client/transaction_client/config.cpp index 3264e5dcd1..165be900a3 100644 --- a/yt/yt/client/transaction_client/config.cpp +++ b/yt/yt/client/transaction_client/config.cpp @@ -31,4 +31,13 @@ void TRemoteTimestampProviderConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TAlienTimestampProviderConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("clock_cluster_tag", &TThis::ClockClusterTag); + registrar.Parameter("timestamp_provider", &TThis::TimestampProvider) + .DefaultNew(); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NTransactionClient diff --git a/yt/yt/client/transaction_client/config.h b/yt/yt/client/transaction_client/config.h index 9c8c69d6a6..97b291de0c 100644 --- a/yt/yt/client/transaction_client/config.h +++ b/yt/yt/client/transaction_client/config.h @@ -38,4 +38,24 @@ DEFINE_REFCOUNTED_TYPE(TRemoteTimestampProviderConfig) //////////////////////////////////////////////////////////////////////////////// +class TAlienTimestampProviderConfig + : public NYTree::TYsonStruct +{ +public: + //! Clock server cell tag + NObjectClient::TCellTag ClockClusterTag; + + NTransactionClient::TRemoteTimestampProviderConfigPtr TimestampProvider; + + REGISTER_YSON_STRUCT(TAlienTimestampProviderConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TAlienTimestampProviderConfig) + +DECLARE_REFCOUNTED_CLASS(TAlienTimestampProviderConfig) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NTransactionClient diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp index 045620a75e..f1e6107a11 100644 --- a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp +++ b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp @@ -22,6 +22,10 @@ using namespace NConcurrency; //////////////////////////////////////////////////////////////////////////////// +static const auto& Logger = TransactionClientLogger; + +//////////////////////////////////////////////////////////////////////////////// + IChannelPtr CreateTimestampProviderChannel( TRemoteTimestampProviderConfigPtr config, IChannelFactoryPtr channelFactory) @@ -105,6 +109,50 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( config->BatchPeriod); } +ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( + const TRemoteTimestampProviderConfigPtr& config, + const IChannelFactoryPtr& channelFactory) +{ + return CreateBatchingRemoteTimestampProvider( + config, + CreateTimestampProviderChannel(config, channelFactory)); +} + +//////////////////////////////////////////////////////////////////////////////// + +TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap( + const std::vector<TAlienTimestampProviderConfigPtr>& configs, + ITimestampProviderPtr nativeProvider, + TCellTag nativeProviderClockClusterTag, + const IChannelFactoryPtr& channelFactory) +{ + TAlienRemoteTimestampProvidersMap alienProvidersMap; + + if (nativeProviderClockClusterTag == InvalidCellTag) { + return alienProvidersMap; + } + + alienProvidersMap.reserve(configs.size() + 1); + EmplaceOrCrash(alienProvidersMap, nativeProviderClockClusterTag, std::move(nativeProvider)); + + for (const auto& foreignProviderConfig : configs) { + auto alienClockCellTag = foreignProviderConfig->ClockClusterTag; + + if(alienProvidersMap.contains(alienClockCellTag)) { + YT_LOG_ALERT("Duplicate entry for alien clock clusters %d", + alienClockCellTag); + continue; + } + + EmplaceOrCrash( + alienProvidersMap, + alienClockCellTag, + CreateBatchingRemoteTimestampProvider(foreignProviderConfig->TimestampProvider, channelFactory)); + } + + return alienProvidersMap; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NTransactionClient diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.h b/yt/yt/client/transaction_client/remote_timestamp_provider.h index 91a7179e92..41a4a3d707 100644 --- a/yt/yt/client/transaction_client/remote_timestamp_provider.h +++ b/yt/yt/client/transaction_client/remote_timestamp_provider.h @@ -2,12 +2,16 @@ #include "public.h" +#include <yt/yt/client/transaction_client/config.h> + #include <yt/yt/core/rpc/public.h> namespace NYT::NTransactionClient { //////////////////////////////////////////////////////////////////////////////// +using TAlienRemoteTimestampProvidersMap = THashMap<NObjectClient::TCellTag, ITimestampProviderPtr>; + NRpc::IChannelPtr CreateTimestampProviderChannel( TRemoteTimestampProviderConfigPtr config, NRpc::IChannelFactoryPtr channelFactory); @@ -29,6 +33,16 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( TRemoteTimestampProviderConfigPtr config, NRpc::IChannelPtr channel); +ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( + const TRemoteTimestampProviderConfigPtr& config, + const NRpc::IChannelFactoryPtr& channelFactory); + +TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap( + const std::vector<TAlienTimestampProviderConfigPtr>& configs, + ITimestampProviderPtr nativeProvider, + NObjectClient::TCellTag nativeProviderClockClusterTag, + const NRpc::IChannelFactoryPtr& channelFactory); + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NTransactionClient diff --git a/yt/yt_proto/yt/client/transaction_client/proto/timestamp_service.proto b/yt/yt_proto/yt/client/transaction_client/proto/timestamp_service.proto index 792e964de3..7bc9ec79a8 100644 --- a/yt/yt_proto/yt/client/transaction_client/proto/timestamp_service.proto +++ b/yt/yt_proto/yt/client/transaction_client/proto/timestamp_service.proto @@ -5,6 +5,7 @@ package NYT.NTransactionClient.NProto; message TReqGenerateTimestamps { optional int32 count = 1 [default = 1]; + optional int32 clock_cluster_tag = 2; } message TRspGenerateTimestamps |