diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-02-21 14:25:13 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-21 11:25:13 +0000 |
commit | c20aff24921230b118b1d3ee019bbed8add21403 (patch) | |
tree | 80753502ccd11b46e4bbf70aaa40df9f620cf409 | |
parent | f6c9ccbd019a074fcfa08155e65aaa759160ea8e (diff) | |
download | ydb-c20aff24921230b118b1d3ee019bbed8add21403.tar.gz |
OltpSink: reattach shard (#14863)
-rw-r--r-- | ydb/core/kqp/common/kqp_tx_manager.cpp | 27 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_tx_manager.h | 20 | ||||
-rw-r--r-- | ydb/core/kqp/common/simple/reattach.cpp | 44 | ||||
-rw-r--r-- | ydb/core/kqp/common/simple/reattach.h | 15 | ||||
-rw-r--r-- | ydb/core/kqp/common/simple/ya.make | 5 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 52 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 103 |
7 files changed, 217 insertions, 49 deletions
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp index 2b1ced46d1..a23ed7d5c7 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.cpp +++ b/ydb/core/kqp/common/kqp_tx_manager.cpp @@ -202,6 +202,29 @@ public: return locks; } + void Reattached(ui64 shardId) override { + auto& shardInfo = ShardsInfo.at(shardId); + shardInfo.Reattaching = false; + } + + void SetRestarting(ui64 shardId) override { + auto& shardInfo = ShardsInfo.at(shardId); + shardInfo.Restarting = true; + } + + bool ShouldReattach(ui64 shardId, TInstant now) override { + auto& shardInfo = ShardsInfo.at(shardId); + if (!std::exchange(shardInfo.Restarting, false) && !shardInfo.Reattaching) { + return false; + } + return ::NKikimr::NKqp::ShouldReattach(now, shardInfo.ReattachState.ReattachInfo);; + } + + TReattachState& GetReattachState(ui64 shardId) override { + auto& shardInfo = ShardsInfo.at(shardId); + return shardInfo.ReattachState; + } + bool IsTxPrepared() const override { for (const auto& [_, shardInfo] : ShardsInfo) { if (shardInfo.State != EShardState::PREPARED) { @@ -454,6 +477,10 @@ private: bool IsOlap = false; THashSet<TStringBuf> Pathes; + + bool Restarting = false; + bool Reattaching = false; + TReattachState ReattachState; }; void MakeLocksIssue(const TShardInfo& shardInfo) { diff --git a/ydb/core/kqp/common/kqp_tx_manager.h b/ydb/core/kqp/common/kqp_tx_manager.h index a80e1a4ce3..18ba29d932 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.h +++ b/ydb/core/kqp/common/kqp_tx_manager.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/kqp/common/kqp_yql.h> +#include <ydb/core/kqp/common/simple/reattach.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/provider/yql_kikimr_provider.h> #include <ydb/core/util/ulid.h> @@ -43,11 +44,21 @@ public: virtual bool AddLock(ui64 shardId, const NKikimrDataEvents::TLock& lock) = 0; virtual void BreakLock(ui64 shardId) = 0; + virtual TVector<NKikimrDataEvents::TLock> GetLocks() const = 0; + virtual TVector<NKikimrDataEvents::TLock> GetLocks(ui64 shardId) const = 0; virtual TTableInfo GetShardTableInfo(ui64 shardId) const = 0; - virtual TVector<NKikimrDataEvents::TLock> GetLocks() const = 0; - virtual TVector<NKikimrDataEvents::TLock> GetLocks(ui64 shardId) const = 0; + virtual bool ShouldReattach(ui64 shardId, TInstant now) = 0; + virtual void Reattached(ui64 shardId) = 0; + virtual void SetRestarting(ui64 shardId) = 0; + + struct TReattachState { + TReattachInfo ReattachInfo; + ui64 Cookie = 0; + }; + + virtual TReattachState& GetReattachState(ui64 shardId) = 0; virtual EShardState GetState(ui64 shardId) const = 0; virtual void SetError(ui64 shardId) = 0; @@ -57,11 +68,7 @@ public: virtual void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) = 0; virtual const NTopic::TTopicOperations& GetTopicOperations() const = 0; - - virtual void SetAllowVolatile(bool allowVolatile) = 0; - virtual void BuildTopicTxs(NTopic::TTopicOperationTransactions& txs) = 0; - virtual bool HasTopics() const = 0; virtual bool IsTxPrepared() const = 0; @@ -74,6 +81,7 @@ public: virtual bool IsEmpty() const = 0; virtual bool HasLocks() const = 0; + virtual void SetAllowVolatile(bool allowVolatile) = 0; virtual bool IsVolatile() const = 0; virtual bool HasSnapshot() const = 0; diff --git a/ydb/core/kqp/common/simple/reattach.cpp b/ydb/core/kqp/common/simple/reattach.cpp new file mode 100644 index 0000000000..0df7f16fd8 --- /dev/null +++ b/ydb/core/kqp/common/simple/reattach.cpp @@ -0,0 +1,44 @@ +#include "reattach.h" + +#include <ydb/core/base/appdata_fwd.h> +#include <library/cpp/random_provider/random_provider.h> + +namespace NKikimr::NKqp { + +namespace { + static constexpr TDuration MinReattachDelay = TDuration::MilliSeconds(10); + static constexpr TDuration MaxReattachDelay = TDuration::MilliSeconds(100); + static constexpr TDuration MaxReattachDuration = TDuration::Seconds(4); +} + +bool ShouldReattach(TInstant now, TReattachInfo& reattachInfo) { + if (!reattachInfo.Reattaching) { + reattachInfo.Deadline = now + MaxReattachDuration; + reattachInfo.Delay = TDuration::Zero(); + reattachInfo.Reattaching = true; + return true; + } + + TDuration left = reattachInfo.Deadline - now; + if (!left) { + reattachInfo.Reattaching = false; + return false; + } + + reattachInfo.Delay *= 2.0; + if (reattachInfo.Delay < MinReattachDelay) { + reattachInfo.Delay = MinReattachDelay; + } else if (reattachInfo.Delay > MaxReattachDelay) { + reattachInfo.Delay = MaxReattachDelay; + } + + // Add ±10% jitter + reattachInfo.Delay *= 0.9 + 0.2 * TAppData::RandomProvider->GenRandReal4(); + if (reattachInfo.Delay > left) { + reattachInfo.Delay = left; + } + + return true; +} + +} diff --git a/ydb/core/kqp/common/simple/reattach.h b/ydb/core/kqp/common/simple/reattach.h new file mode 100644 index 0000000000..0016bac676 --- /dev/null +++ b/ydb/core/kqp/common/simple/reattach.h @@ -0,0 +1,15 @@ +#pragma once + +#include <util/datetime/base.h> + +namespace NKikimr::NKqp { + +struct TReattachInfo { + TDuration Delay; + TInstant Deadline; + bool Reattaching = false; +}; + +bool ShouldReattach(TInstant now, TReattachInfo& reattachInfo); + +} diff --git a/ydb/core/kqp/common/simple/ya.make b/ydb/core/kqp/common/simple/ya.make index 48bccc8134..bafd993b53 100644 --- a/ydb/core/kqp/common/simple/ya.make +++ b/ydb/core/kqp/common/simple/ya.make @@ -2,10 +2,11 @@ LIBRARY() SRCS( helpers.cpp + kqp_event_ids.cpp query_id.cpp - settings.cpp + reattach.cpp services.cpp - kqp_event_ids.cpp + settings.cpp temp_tables.cpp ) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 03a2e2ccc0..614442fd57 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -11,8 +11,9 @@ #include <ydb/core/client/minikql_compile/db_key_resolver.h> #include <ydb/core/kqp/common/buffer/events.h> #include <ydb/core/kqp/common/kqp_data_integrity_trails.h> -#include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/common/kqp_tx_manager.h> +#include <ydb/core/kqp/common/kqp_yql.h> +#include <ydb/core/kqp/common/simple/reattach.h> #include <ydb/library/wilson_ids/wilson.h> #include <ydb/core/kqp/compute_actor/kqp_compute_actor.h> #include <ydb/core/kqp/common/kqp_tx.h> @@ -41,9 +42,6 @@ using namespace NLongTxService; namespace { -static constexpr TDuration MinReattachDelay = TDuration::MilliSeconds(10); -static constexpr TDuration MaxReattachDelay = TDuration::MilliSeconds(100); -static constexpr TDuration MaxReattachDuration = TDuration::Seconds(4); static constexpr ui32 ReplySizeLimit = 48 * 1024 * 1024; // 48 MB class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Data> { @@ -51,45 +49,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da using TKqpSnapshot = IKqpGateway::TKqpSnapshot; struct TReattachState { - TDuration Delay; - TInstant Deadline; + TReattachInfo ReattachInfo; ui64 Cookie = 0; - bool Reattaching = false; bool ShouldReattach(TInstant now) { ++Cookie; // invalidate any previous cookie - if (!Reattaching) { - Deadline = now + MaxReattachDuration; - Delay = TDuration::Zero(); - Reattaching = true; - return true; - } - - TDuration left = Deadline - now; - if (!left) { - Reattaching = false; - return false; - } - - Delay *= 2.0; - if (Delay < MinReattachDelay) { - Delay = MinReattachDelay; - } else if (Delay > MaxReattachDelay) { - Delay = MaxReattachDelay; - } - - // Add ±10% jitter - Delay *= 0.9 + 0.2 * TAppData::RandomProvider->GenRandReal4(); - if (Delay > left) { - Delay = left; - } - - return true; + return ::NKikimr::NKqp::ShouldReattach(now, ReattachInfo); } void Reattached() { - Reattaching = false; + ReattachInfo.Reattaching = false; } }; @@ -729,13 +699,13 @@ private: case TShardState::EState::Prepared: { // Disconnected while waiting for other shards to prepare - if ((wasRestarting || shardState->ReattachState.Reattaching) && + if ((wasRestarting || shardState->ReattachState.ReattachInfo.Reattaching) && shardState->ReattachState.ShouldReattach(TlsActivationContext->Now())) { LOG_N("Shard " << msg->TabletId << " delivery problem (already prepared, reattaching in " - << shardState->ReattachState.Delay << ")"); + << shardState->ReattachState.ReattachInfo.Delay << ")"); - Schedule(shardState->ReattachState.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId)); + Schedule(shardState->ReattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId)); ++shardState->RestartCount; return; } @@ -1542,13 +1512,13 @@ private: // Proceed with query processing [[fallthrough]]; case TShardState::EState::Executing: { - if ((wasRestarting || shardState->ReattachState.Reattaching) && + if ((wasRestarting || shardState->ReattachState.ReattachInfo.Reattaching) && shardState->ReattachState.ShouldReattach(TlsActivationContext->Now())) { LOG_N("Shard " << msg->TabletId << " lost pipe while waiting for reply (reattaching in " - << shardState->ReattachState.Delay << ")"); + << shardState->ReattachState.ReattachInfo.Delay << ")"); - Schedule(shardState->ReattachState.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId)); + Schedule(shardState->ReattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId)); ++shardState->RestartCount; return; } diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 73d24cddca..10289ad7ad 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -175,6 +175,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> { enum EEv { EvShardRequestTimeout = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), EvResolveRequestPlanned, + EvReattachToShard, }; struct TEvShardRequestTimeout : public TEventLocal<TEvShardRequestTimeout, EvShardRequestTimeout> { @@ -187,6 +188,13 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> { struct TEvResolveRequestPlanned : public TEventLocal<TEvResolveRequestPlanned, EvResolveRequestPlanned> { }; + + struct TEvReattachToShard : public TEventLocal<TEvReattachToShard, EvReattachToShard> { + const ui64 TabletId; + + explicit TEvReattachToShard(ui64 tabletId) + : TabletId(tabletId) {} + }; }; enum class EMode { @@ -370,6 +378,9 @@ public: hFunc(NKikimr::NEvents::TDataEvents::TEvWriteResult, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle); + hFunc(TEvDataShard::TEvProposeTransactionAttachResult, Handle); + hFunc(TEvPrivate::TEvReattachToShard, Handle); + hFunc(TEvDataShard::TEvProposeTransactionRestart, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); hFunc(TEvPrivate::TEvShardRequestTimeout, Handle); hFunc(TEvPrivate::TEvResolveRequestPlanned, Handle); @@ -975,6 +986,15 @@ public: CA_LOG_W("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId); if (InconsistentTx) { RetryShard(ev->Get()->TabletId, std::nullopt); + } else if ((TxManager->GetState(ev->Get()->TabletId) == IKqpTransactionManager::PREPARED + || TxManager->GetState(ev->Get()->TabletId) == IKqpTransactionManager::EXECUTING) + && TxManager->ShouldReattach(ev->Get()->TabletId, TlsActivationContext->Now())) { + // Disconnected while waiting for other shards to prepare + auto& reattachState = TxManager->GetReattachState(ev->Get()->TabletId); + CA_LOG_N("Shard " << ev->Get()->TabletId << " delivery problem (reattaching in " + << reattachState.ReattachInfo.Delay << ")"); + + Schedule(reattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(ev->Get()->TabletId)); } else { TxManager->SetError(ev->Get()->TabletId); if (Mode == EMode::IMMEDIATE_COMMIT) { @@ -995,6 +1015,89 @@ public: } } + void Handle(TEvDataShard::TEvProposeTransactionAttachResult::TPtr& ev) { + const auto& record = ev->Get()->Record; + const ui64 shardId = record.GetTabletId(); + + auto& reattachState = TxManager->GetReattachState(shardId); + if (reattachState.Cookie != ev->Cookie) { + return; + } + + const auto shardState = TxManager->GetState(shardId); + switch (shardState) { + case IKqpTransactionManager::EXECUTING: + YQL_ENSURE(Mode == EMode::COMMIT || Mode == EMode::IMMEDIATE_COMMIT); + case IKqpTransactionManager::PREPARED: + YQL_ENSURE(Mode == EMode::PREPARE); + case IKqpTransactionManager::PREPARING: + case IKqpTransactionManager::FINISHED: + case IKqpTransactionManager::ERROR: + case IKqpTransactionManager::PROCESSING: + YQL_ENSURE(false); + } + + if (record.GetStatus() == NKikimrProto::OK) { + // Transaction still exists at this shard + CA_LOG_D("Reattached to shard " << shardId); + TxManager->Reattached(shardId); + return; + } + + if (Mode == EMode::PREPARE) { + RuntimeError( + NYql::NDqProto::StatusIds::UNAVAILABLE, + NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, + TStringBuilder() + << "ShardId=" << shardId + << " for table '" << TablePath + << "': attach transaction failed."); + } else { + RuntimeError( + NYql::NDqProto::StatusIds::UNAVAILABLE, + NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN, + TStringBuilder() + << "ShardId=" << shardId + << " for table '" << TablePath + << "': attach transaction failed."); + } + } + + void Handle(TEvDataShard::TEvProposeTransactionRestart::TPtr& ev) { + const auto& record = ev->Get()->Record; + const ui64 shardId = record.GetTabletId(); + + CA_LOG_D("Got transaction restart event from tabletId: " << shardId); + + switch (TxManager->GetState(shardId)) { + case IKqpTransactionManager::EXECUTING: { + TxManager->SetRestarting(shardId); + return; + } + case IKqpTransactionManager::FINISHED: + case IKqpTransactionManager::ERROR: { + return; + } + case IKqpTransactionManager::PREPARING: + case IKqpTransactionManager::PREPARED: + case IKqpTransactionManager::PROCESSING: { + YQL_ENSURE(false); + } + } + } + + void Handle(TEvPrivate::TEvReattachToShard::TPtr& ev) { + const ui64 tabletId = ev->Get()->TabletId; + auto& state = TxManager->GetReattachState(tabletId); + + CA_LOG_D("Reattach to shard " << tabletId); + + YQL_ENSURE(TxId); + Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward( + new TEvDataShard::TEvProposeTransactionAttach(tabletId, *TxId), + tabletId, /* subscribe */ true), 0, ++state.Cookie); + } + void Prepare() { TableWriteActorStateSpan.EndOk(); ResolveAttempts = 0; |