summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yt/yt/client/transaction_client/public.h4
-rw-r--r--yt/yt/client/transaction_client/remote_timestamp_provider.cpp63
-rw-r--r--yt/yt/client/transaction_client/remote_timestamp_provider.h5
3 files changed, 66 insertions, 6 deletions
diff --git a/yt/yt/client/transaction_client/public.h b/yt/yt/client/transaction_client/public.h
index fd3f41b3705..c910c3d6ecb 100644
--- a/yt/yt/client/transaction_client/public.h
+++ b/yt/yt/client/transaction_client/public.h
@@ -31,6 +31,10 @@ DEFINE_ENUM(ECommitOrdering,
((Strong) (1)) // Rows are appended to tablet in order of timestamps
);
+DEFINE_ENUM(ETimestampProviderFeature,
+ ((AlienClocks) (0))
+);
+
YT_DEFINE_ERROR_ENUM(
((NoSuchTransaction) (11000))
((NestedExternalTransactionExists) (11001))
diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
index 00def063ee9..a29cea5ecea 100644
--- a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
+++ b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
@@ -66,9 +66,11 @@ class TRemoteTimestampProvider
public:
TRemoteTimestampProvider(
IChannelPtr channel,
- TRemoteTimestampProviderConfigPtr config)
+ TRemoteTimestampProviderConfigPtr config,
+ bool allowOldClocks)
: TTimestampProviderBase(config->LatestTimestampUpdatePeriod)
, Config_(std::move(config))
+ , AllowOldClocks_(allowOldClocks)
, Proxy_(std::move(channel))
{
Proxy_.SetDefaultTimeout(Config_->RpcTimeout);
@@ -76,6 +78,7 @@ public:
private:
const TRemoteTimestampProviderConfigPtr Config_;
+ const bool AllowOldClocks_ = false;
TTimestampServiceProxy Proxy_;
@@ -85,8 +88,13 @@ private:
req->SetResponseHeavy(true);
req->set_count(count);
if (clockClusterTag != InvalidCellTag) {
+ if (!AllowOldClocks_) {
+ req->RequireServerFeature(ETimestampProviderFeature::AlienClocks);
+ }
+
req->set_clock_cluster_tag(ToProto<int>(clockClusterTag));
}
+
return req->Invoke().Apply(
BIND([clockClusterTag] (const TTimestampServiceProxy::TRspGenerateTimestampsPtr& rsp) {
auto responseClockClusterTag = rsp->has_clock_cluster_tag()
@@ -111,28 +119,68 @@ private:
ITimestampProviderPtr CreateRemoteTimestampProvider(
TRemoteTimestampProviderConfigPtr config,
+ IChannelPtr channel,
+ bool allowOldClocks)
+{
+ return New<TRemoteTimestampProvider>(
+ std::move(channel),
+ std::move(config),
+ allowOldClocks);
+}
+
+ITimestampProviderPtr CreateRemoteTimestampProvider(
+ TRemoteTimestampProviderConfigPtr config,
IChannelPtr channel)
{
- return New<TRemoteTimestampProvider>(std::move(channel), std::move(config));
+ return CreateRemoteTimestampProvider(
+ std::move(config),
+ std::move(channel),
+ false);
}
ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
TRemoteTimestampProviderConfigPtr config,
- IChannelPtr channel)
+ IChannelPtr channel,
+ bool allowOldClocks)
{
- auto underlying = CreateRemoteTimestampProvider(config, std::move(channel));
+ auto underlying = CreateRemoteTimestampProvider(
+ config,
+ std::move(channel),
+ allowOldClocks);
return CreateBatchingTimestampProvider(
std::move(underlying),
config->BatchPeriod);
}
ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
+ TRemoteTimestampProviderConfigPtr config,
+ IChannelPtr channel)
+{
+ return CreateBatchingRemoteTimestampProvider(
+ std::move(config),
+ std::move(channel),
+ false);
+}
+
+ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
+ const TRemoteTimestampProviderConfigPtr& config,
+ const IChannelFactoryPtr& channelFactory,
+ bool allowOldClocks)
+{
+ return CreateBatchingRemoteTimestampProvider(
+ config,
+ CreateTimestampProviderChannel(config, channelFactory),
+ allowOldClocks);
+}
+
+ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
const TRemoteTimestampProviderConfigPtr& config,
const IChannelFactoryPtr& channelFactory)
{
return CreateBatchingRemoteTimestampProvider(
config,
- CreateTimestampProviderChannel(config, channelFactory));
+ channelFactory,
+ false);
}
////////////////////////////////////////////////////////////////////////////////
@@ -164,7 +212,10 @@ TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap(
EmplaceOrCrash(
alienProvidersMap,
alienClockCellTag,
- CreateBatchingRemoteTimestampProvider(foreignProviderConfig->TimestampProvider, channelFactory));
+ CreateBatchingRemoteTimestampProvider(
+ foreignProviderConfig->TimestampProvider,
+ channelFactory,
+ true));
}
return alienProvidersMap;
diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.h b/yt/yt/client/transaction_client/remote_timestamp_provider.h
index 41a4a3d707d..806f47a5b3a 100644
--- a/yt/yt/client/transaction_client/remote_timestamp_provider.h
+++ b/yt/yt/client/transaction_client/remote_timestamp_provider.h
@@ -37,6 +37,11 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
const TRemoteTimestampProviderConfigPtr& config,
const NRpc::IChannelFactoryPtr& channelFactory);
+ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
+ const TRemoteTimestampProviderConfigPtr& config,
+ const NRpc::IChannelFactoryPtr& channelFactory,
+ bool allowOldClocks);
+
TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap(
const std::vector<TAlienTimestampProviderConfigPtr>& configs,
ITimestampProviderPtr nativeProvider,