aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorosidorkin <osidorkin@yandex-team.com>2024-08-07 14:16:43 +0300
committerosidorkin <osidorkin@yandex-team.com>2024-08-07 14:49:14 +0300
commit9b8b737f3b9ece767624afde5c161b7fc8d00cf6 (patch)
treed67e9033292e7b6585bbeefc9c85807119037dad
parentc59a2427a037c1a04a24ed46497ab3a298ffa5d1 (diff)
downloadydb-9b8b737f3b9ece767624afde5c161b7fc8d00cf6.tar.gz
YT-22451: request timestamp provider feature for alien requests. Add fallback to direct connection to remote cluster to TRemoteClusterTimestampProvider
4545626d60064b5bb9389f27a1daebe8ea395461
-rw-r--r--yt/yt/client/transaction_client/public.h4
-rw-r--r--yt/yt/client/transaction_client/remote_timestamp_provider.cpp63
-rw-r--r--yt/yt/client/transaction_client/remote_timestamp_provider.h5
3 files changed, 66 insertions, 6 deletions
diff --git a/yt/yt/client/transaction_client/public.h b/yt/yt/client/transaction_client/public.h
index fd3f41b370..c910c3d6ec 100644
--- a/yt/yt/client/transaction_client/public.h
+++ b/yt/yt/client/transaction_client/public.h
@@ -31,6 +31,10 @@ DEFINE_ENUM(ECommitOrdering,
((Strong) (1)) // Rows are appended to tablet in order of timestamps
);
+DEFINE_ENUM(ETimestampProviderFeature,
+ ((AlienClocks) (0))
+);
+
YT_DEFINE_ERROR_ENUM(
((NoSuchTransaction) (11000))
((NestedExternalTransactionExists) (11001))
diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
index 00def063ee..a29cea5ece 100644
--- a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
+++ b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
@@ -66,9 +66,11 @@ class TRemoteTimestampProvider
public:
TRemoteTimestampProvider(
IChannelPtr channel,
- TRemoteTimestampProviderConfigPtr config)
+ TRemoteTimestampProviderConfigPtr config,
+ bool allowOldClocks)
: TTimestampProviderBase(config->LatestTimestampUpdatePeriod)
, Config_(std::move(config))
+ , AllowOldClocks_(allowOldClocks)
, Proxy_(std::move(channel))
{
Proxy_.SetDefaultTimeout(Config_->RpcTimeout);
@@ -76,6 +78,7 @@ public:
private:
const TRemoteTimestampProviderConfigPtr Config_;
+ const bool AllowOldClocks_ = false;
TTimestampServiceProxy Proxy_;
@@ -85,8 +88,13 @@ private:
req->SetResponseHeavy(true);
req->set_count(count);
if (clockClusterTag != InvalidCellTag) {
+ if (!AllowOldClocks_) {
+ req->RequireServerFeature(ETimestampProviderFeature::AlienClocks);
+ }
+
req->set_clock_cluster_tag(ToProto<int>(clockClusterTag));
}
+
return req->Invoke().Apply(
BIND([clockClusterTag] (const TTimestampServiceProxy::TRspGenerateTimestampsPtr& rsp) {
auto responseClockClusterTag = rsp->has_clock_cluster_tag()
@@ -111,28 +119,68 @@ private:
ITimestampProviderPtr CreateRemoteTimestampProvider(
TRemoteTimestampProviderConfigPtr config,
+ IChannelPtr channel,
+ bool allowOldClocks)
+{
+ return New<TRemoteTimestampProvider>(
+ std::move(channel),
+ std::move(config),
+ allowOldClocks);
+}
+
+ITimestampProviderPtr CreateRemoteTimestampProvider(
+ TRemoteTimestampProviderConfigPtr config,
IChannelPtr channel)
{
- return New<TRemoteTimestampProvider>(std::move(channel), std::move(config));
+ return CreateRemoteTimestampProvider(
+ std::move(config),
+ std::move(channel),
+ false);
}
ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
TRemoteTimestampProviderConfigPtr config,
- IChannelPtr channel)
+ IChannelPtr channel,
+ bool allowOldClocks)
{
- auto underlying = CreateRemoteTimestampProvider(config, std::move(channel));
+ auto underlying = CreateRemoteTimestampProvider(
+ config,
+ std::move(channel),
+ allowOldClocks);
return CreateBatchingTimestampProvider(
std::move(underlying),
config->BatchPeriod);
}
ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
+ TRemoteTimestampProviderConfigPtr config,
+ IChannelPtr channel)
+{
+ return CreateBatchingRemoteTimestampProvider(
+ std::move(config),
+ std::move(channel),
+ false);
+}
+
+ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
+ const TRemoteTimestampProviderConfigPtr& config,
+ const IChannelFactoryPtr& channelFactory,
+ bool allowOldClocks)
+{
+ return CreateBatchingRemoteTimestampProvider(
+ config,
+ CreateTimestampProviderChannel(config, channelFactory),
+ allowOldClocks);
+}
+
+ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
const TRemoteTimestampProviderConfigPtr& config,
const IChannelFactoryPtr& channelFactory)
{
return CreateBatchingRemoteTimestampProvider(
config,
- CreateTimestampProviderChannel(config, channelFactory));
+ channelFactory,
+ false);
}
////////////////////////////////////////////////////////////////////////////////
@@ -164,7 +212,10 @@ TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap(
EmplaceOrCrash(
alienProvidersMap,
alienClockCellTag,
- CreateBatchingRemoteTimestampProvider(foreignProviderConfig->TimestampProvider, channelFactory));
+ CreateBatchingRemoteTimestampProvider(
+ foreignProviderConfig->TimestampProvider,
+ channelFactory,
+ true));
}
return alienProvidersMap;
diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.h b/yt/yt/client/transaction_client/remote_timestamp_provider.h
index 41a4a3d707..806f47a5b3 100644
--- a/yt/yt/client/transaction_client/remote_timestamp_provider.h
+++ b/yt/yt/client/transaction_client/remote_timestamp_provider.h
@@ -37,6 +37,11 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
const TRemoteTimestampProviderConfigPtr& config,
const NRpc::IChannelFactoryPtr& channelFactory);
+ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
+ const TRemoteTimestampProviderConfigPtr& config,
+ const NRpc::IChannelFactoryPtr& channelFactory,
+ bool allowOldClocks);
+
TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap(
const std::vector<TAlienTimestampProviderConfigPtr>& configs,
ITimestampProviderPtr nativeProvider,