diff options
author | osidorkin <osidorkin@yandex-team.com> | 2024-08-07 14:16:43 +0300 |
---|---|---|
committer | osidorkin <osidorkin@yandex-team.com> | 2024-08-07 14:49:14 +0300 |
commit | 9b8b737f3b9ece767624afde5c161b7fc8d00cf6 (patch) | |
tree | d67e9033292e7b6585bbeefc9c85807119037dad | |
parent | c59a2427a037c1a04a24ed46497ab3a298ffa5d1 (diff) | |
download | ydb-9b8b737f3b9ece767624afde5c161b7fc8d00cf6.tar.gz |
YT-22451: request timestamp provider feature for alien requests. Add fallback to direct connection to remote cluster to TRemoteClusterTimestampProvider
4545626d60064b5bb9389f27a1daebe8ea395461
-rw-r--r-- | yt/yt/client/transaction_client/public.h | 4 | ||||
-rw-r--r-- | yt/yt/client/transaction_client/remote_timestamp_provider.cpp | 63 | ||||
-rw-r--r-- | yt/yt/client/transaction_client/remote_timestamp_provider.h | 5 |
3 files changed, 66 insertions, 6 deletions
diff --git a/yt/yt/client/transaction_client/public.h b/yt/yt/client/transaction_client/public.h index fd3f41b370..c910c3d6ec 100644 --- a/yt/yt/client/transaction_client/public.h +++ b/yt/yt/client/transaction_client/public.h @@ -31,6 +31,10 @@ DEFINE_ENUM(ECommitOrdering, ((Strong) (1)) // Rows are appended to tablet in order of timestamps ); +DEFINE_ENUM(ETimestampProviderFeature, + ((AlienClocks) (0)) +); + YT_DEFINE_ERROR_ENUM( ((NoSuchTransaction) (11000)) ((NestedExternalTransactionExists) (11001)) diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp index 00def063ee..a29cea5ece 100644 --- a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp +++ b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp @@ -66,9 +66,11 @@ class TRemoteTimestampProvider public: TRemoteTimestampProvider( IChannelPtr channel, - TRemoteTimestampProviderConfigPtr config) + TRemoteTimestampProviderConfigPtr config, + bool allowOldClocks) : TTimestampProviderBase(config->LatestTimestampUpdatePeriod) , Config_(std::move(config)) + , AllowOldClocks_(allowOldClocks) , Proxy_(std::move(channel)) { Proxy_.SetDefaultTimeout(Config_->RpcTimeout); @@ -76,6 +78,7 @@ public: private: const TRemoteTimestampProviderConfigPtr Config_; + const bool AllowOldClocks_ = false; TTimestampServiceProxy Proxy_; @@ -85,8 +88,13 @@ private: req->SetResponseHeavy(true); req->set_count(count); if (clockClusterTag != InvalidCellTag) { + if (!AllowOldClocks_) { + req->RequireServerFeature(ETimestampProviderFeature::AlienClocks); + } + req->set_clock_cluster_tag(ToProto<int>(clockClusterTag)); } + return req->Invoke().Apply( BIND([clockClusterTag] (const TTimestampServiceProxy::TRspGenerateTimestampsPtr& rsp) { auto responseClockClusterTag = rsp->has_clock_cluster_tag() @@ -111,28 +119,68 @@ private: ITimestampProviderPtr CreateRemoteTimestampProvider( TRemoteTimestampProviderConfigPtr config, + IChannelPtr channel, + bool allowOldClocks) +{ + return New<TRemoteTimestampProvider>( + std::move(channel), + std::move(config), + allowOldClocks); +} + +ITimestampProviderPtr CreateRemoteTimestampProvider( + TRemoteTimestampProviderConfigPtr config, IChannelPtr channel) { - return New<TRemoteTimestampProvider>(std::move(channel), std::move(config)); + return CreateRemoteTimestampProvider( + std::move(config), + std::move(channel), + false); } ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( TRemoteTimestampProviderConfigPtr config, - IChannelPtr channel) + IChannelPtr channel, + bool allowOldClocks) { - auto underlying = CreateRemoteTimestampProvider(config, std::move(channel)); + auto underlying = CreateRemoteTimestampProvider( + config, + std::move(channel), + allowOldClocks); return CreateBatchingTimestampProvider( std::move(underlying), config->BatchPeriod); } ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( + TRemoteTimestampProviderConfigPtr config, + IChannelPtr channel) +{ + return CreateBatchingRemoteTimestampProvider( + std::move(config), + std::move(channel), + false); +} + +ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( + const TRemoteTimestampProviderConfigPtr& config, + const IChannelFactoryPtr& channelFactory, + bool allowOldClocks) +{ + return CreateBatchingRemoteTimestampProvider( + config, + CreateTimestampProviderChannel(config, channelFactory), + allowOldClocks); +} + +ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( const TRemoteTimestampProviderConfigPtr& config, const IChannelFactoryPtr& channelFactory) { return CreateBatchingRemoteTimestampProvider( config, - CreateTimestampProviderChannel(config, channelFactory)); + channelFactory, + false); } //////////////////////////////////////////////////////////////////////////////// @@ -164,7 +212,10 @@ TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap( EmplaceOrCrash( alienProvidersMap, alienClockCellTag, - CreateBatchingRemoteTimestampProvider(foreignProviderConfig->TimestampProvider, channelFactory)); + CreateBatchingRemoteTimestampProvider( + foreignProviderConfig->TimestampProvider, + channelFactory, + true)); } return alienProvidersMap; diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.h b/yt/yt/client/transaction_client/remote_timestamp_provider.h index 41a4a3d707..806f47a5b3 100644 --- a/yt/yt/client/transaction_client/remote_timestamp_provider.h +++ b/yt/yt/client/transaction_client/remote_timestamp_provider.h @@ -37,6 +37,11 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( const TRemoteTimestampProviderConfigPtr& config, const NRpc::IChannelFactoryPtr& channelFactory); +ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( + const TRemoteTimestampProviderConfigPtr& config, + const NRpc::IChannelFactoryPtr& channelFactory, + bool allowOldClocks); + TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap( const std::vector<TAlienTimestampProviderConfigPtr>& configs, ITimestampProviderPtr nativeProvider, |