aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-02-21 14:25:13 +0300
committerGitHub <noreply@github.com>2025-02-21 11:25:13 +0000
commitc20aff24921230b118b1d3ee019bbed8add21403 (patch)
tree80753502ccd11b46e4bbf70aaa40df9f620cf409
parentf6c9ccbd019a074fcfa08155e65aaa759160ea8e (diff)
downloadydb-c20aff24921230b118b1d3ee019bbed8add21403.tar.gz
OltpSink: reattach shard (#14863)
-rw-r--r--ydb/core/kqp/common/kqp_tx_manager.cpp27
-rw-r--r--ydb/core/kqp/common/kqp_tx_manager.h20
-rw-r--r--ydb/core/kqp/common/simple/reattach.cpp44
-rw-r--r--ydb/core/kqp/common/simple/reattach.h15
-rw-r--r--ydb/core/kqp/common/simple/ya.make5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp52
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp103
7 files changed, 217 insertions, 49 deletions
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp
index 2b1ced46d1..a23ed7d5c7 100644
--- a/ydb/core/kqp/common/kqp_tx_manager.cpp
+++ b/ydb/core/kqp/common/kqp_tx_manager.cpp
@@ -202,6 +202,29 @@ public:
return locks;
}
+ void Reattached(ui64 shardId) override {
+ auto& shardInfo = ShardsInfo.at(shardId);
+ shardInfo.Reattaching = false;
+ }
+
+ void SetRestarting(ui64 shardId) override {
+ auto& shardInfo = ShardsInfo.at(shardId);
+ shardInfo.Restarting = true;
+ }
+
+ bool ShouldReattach(ui64 shardId, TInstant now) override {
+ auto& shardInfo = ShardsInfo.at(shardId);
+ if (!std::exchange(shardInfo.Restarting, false) && !shardInfo.Reattaching) {
+ return false;
+ }
+ return ::NKikimr::NKqp::ShouldReattach(now, shardInfo.ReattachState.ReattachInfo);;
+ }
+
+ TReattachState& GetReattachState(ui64 shardId) override {
+ auto& shardInfo = ShardsInfo.at(shardId);
+ return shardInfo.ReattachState;
+ }
+
bool IsTxPrepared() const override {
for (const auto& [_, shardInfo] : ShardsInfo) {
if (shardInfo.State != EShardState::PREPARED) {
@@ -454,6 +477,10 @@ private:
bool IsOlap = false;
THashSet<TStringBuf> Pathes;
+
+ bool Restarting = false;
+ bool Reattaching = false;
+ TReattachState ReattachState;
};
void MakeLocksIssue(const TShardInfo& shardInfo) {
diff --git a/ydb/core/kqp/common/kqp_tx_manager.h b/ydb/core/kqp/common/kqp_tx_manager.h
index a80e1a4ce3..18ba29d932 100644
--- a/ydb/core/kqp/common/kqp_tx_manager.h
+++ b/ydb/core/kqp/common/kqp_tx_manager.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/kqp/common/kqp_yql.h>
+#include <ydb/core/kqp/common/simple/reattach.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
#include <ydb/core/util/ulid.h>
@@ -43,11 +44,21 @@ public:
virtual bool AddLock(ui64 shardId, const NKikimrDataEvents::TLock& lock) = 0;
virtual void BreakLock(ui64 shardId) = 0;
+ virtual TVector<NKikimrDataEvents::TLock> GetLocks() const = 0;
+ virtual TVector<NKikimrDataEvents::TLock> GetLocks(ui64 shardId) const = 0;
virtual TTableInfo GetShardTableInfo(ui64 shardId) const = 0;
- virtual TVector<NKikimrDataEvents::TLock> GetLocks() const = 0;
- virtual TVector<NKikimrDataEvents::TLock> GetLocks(ui64 shardId) const = 0;
+ virtual bool ShouldReattach(ui64 shardId, TInstant now) = 0;
+ virtual void Reattached(ui64 shardId) = 0;
+ virtual void SetRestarting(ui64 shardId) = 0;
+
+ struct TReattachState {
+ TReattachInfo ReattachInfo;
+ ui64 Cookie = 0;
+ };
+
+ virtual TReattachState& GetReattachState(ui64 shardId) = 0;
virtual EShardState GetState(ui64 shardId) const = 0;
virtual void SetError(ui64 shardId) = 0;
@@ -57,11 +68,7 @@ public:
virtual void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) = 0;
virtual const NTopic::TTopicOperations& GetTopicOperations() const = 0;
-
- virtual void SetAllowVolatile(bool allowVolatile) = 0;
-
virtual void BuildTopicTxs(NTopic::TTopicOperationTransactions& txs) = 0;
-
virtual bool HasTopics() const = 0;
virtual bool IsTxPrepared() const = 0;
@@ -74,6 +81,7 @@ public:
virtual bool IsEmpty() const = 0;
virtual bool HasLocks() const = 0;
+ virtual void SetAllowVolatile(bool allowVolatile) = 0;
virtual bool IsVolatile() const = 0;
virtual bool HasSnapshot() const = 0;
diff --git a/ydb/core/kqp/common/simple/reattach.cpp b/ydb/core/kqp/common/simple/reattach.cpp
new file mode 100644
index 0000000000..0df7f16fd8
--- /dev/null
+++ b/ydb/core/kqp/common/simple/reattach.cpp
@@ -0,0 +1,44 @@
+#include "reattach.h"
+
+#include <ydb/core/base/appdata_fwd.h>
+#include <library/cpp/random_provider/random_provider.h>
+
+namespace NKikimr::NKqp {
+
+namespace {
+ static constexpr TDuration MinReattachDelay = TDuration::MilliSeconds(10);
+ static constexpr TDuration MaxReattachDelay = TDuration::MilliSeconds(100);
+ static constexpr TDuration MaxReattachDuration = TDuration::Seconds(4);
+}
+
+bool ShouldReattach(TInstant now, TReattachInfo& reattachInfo) {
+ if (!reattachInfo.Reattaching) {
+ reattachInfo.Deadline = now + MaxReattachDuration;
+ reattachInfo.Delay = TDuration::Zero();
+ reattachInfo.Reattaching = true;
+ return true;
+ }
+
+ TDuration left = reattachInfo.Deadline - now;
+ if (!left) {
+ reattachInfo.Reattaching = false;
+ return false;
+ }
+
+ reattachInfo.Delay *= 2.0;
+ if (reattachInfo.Delay < MinReattachDelay) {
+ reattachInfo.Delay = MinReattachDelay;
+ } else if (reattachInfo.Delay > MaxReattachDelay) {
+ reattachInfo.Delay = MaxReattachDelay;
+ }
+
+ // Add ±10% jitter
+ reattachInfo.Delay *= 0.9 + 0.2 * TAppData::RandomProvider->GenRandReal4();
+ if (reattachInfo.Delay > left) {
+ reattachInfo.Delay = left;
+ }
+
+ return true;
+}
+
+}
diff --git a/ydb/core/kqp/common/simple/reattach.h b/ydb/core/kqp/common/simple/reattach.h
new file mode 100644
index 0000000000..0016bac676
--- /dev/null
+++ b/ydb/core/kqp/common/simple/reattach.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <util/datetime/base.h>
+
+namespace NKikimr::NKqp {
+
+struct TReattachInfo {
+ TDuration Delay;
+ TInstant Deadline;
+ bool Reattaching = false;
+};
+
+bool ShouldReattach(TInstant now, TReattachInfo& reattachInfo);
+
+}
diff --git a/ydb/core/kqp/common/simple/ya.make b/ydb/core/kqp/common/simple/ya.make
index 48bccc8134..bafd993b53 100644
--- a/ydb/core/kqp/common/simple/ya.make
+++ b/ydb/core/kqp/common/simple/ya.make
@@ -2,10 +2,11 @@ LIBRARY()
SRCS(
helpers.cpp
+ kqp_event_ids.cpp
query_id.cpp
- settings.cpp
+ reattach.cpp
services.cpp
- kqp_event_ids.cpp
+ settings.cpp
temp_tables.cpp
)
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 03a2e2ccc0..614442fd57 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -11,8 +11,9 @@
#include <ydb/core/client/minikql_compile/db_key_resolver.h>
#include <ydb/core/kqp/common/buffer/events.h>
#include <ydb/core/kqp/common/kqp_data_integrity_trails.h>
-#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/common/kqp_tx_manager.h>
+#include <ydb/core/kqp/common/kqp_yql.h>
+#include <ydb/core/kqp/common/simple/reattach.h>
#include <ydb/library/wilson_ids/wilson.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
#include <ydb/core/kqp/common/kqp_tx.h>
@@ -41,9 +42,6 @@ using namespace NLongTxService;
namespace {
-static constexpr TDuration MinReattachDelay = TDuration::MilliSeconds(10);
-static constexpr TDuration MaxReattachDelay = TDuration::MilliSeconds(100);
-static constexpr TDuration MaxReattachDuration = TDuration::Seconds(4);
static constexpr ui32 ReplySizeLimit = 48 * 1024 * 1024; // 48 MB
class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Data> {
@@ -51,45 +49,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
using TKqpSnapshot = IKqpGateway::TKqpSnapshot;
struct TReattachState {
- TDuration Delay;
- TInstant Deadline;
+ TReattachInfo ReattachInfo;
ui64 Cookie = 0;
- bool Reattaching = false;
bool ShouldReattach(TInstant now) {
++Cookie; // invalidate any previous cookie
- if (!Reattaching) {
- Deadline = now + MaxReattachDuration;
- Delay = TDuration::Zero();
- Reattaching = true;
- return true;
- }
-
- TDuration left = Deadline - now;
- if (!left) {
- Reattaching = false;
- return false;
- }
-
- Delay *= 2.0;
- if (Delay < MinReattachDelay) {
- Delay = MinReattachDelay;
- } else if (Delay > MaxReattachDelay) {
- Delay = MaxReattachDelay;
- }
-
- // Add ±10% jitter
- Delay *= 0.9 + 0.2 * TAppData::RandomProvider->GenRandReal4();
- if (Delay > left) {
- Delay = left;
- }
-
- return true;
+ return ::NKikimr::NKqp::ShouldReattach(now, ReattachInfo);
}
void Reattached() {
- Reattaching = false;
+ ReattachInfo.Reattaching = false;
}
};
@@ -729,13 +699,13 @@ private:
case TShardState::EState::Prepared: {
// Disconnected while waiting for other shards to prepare
- if ((wasRestarting || shardState->ReattachState.Reattaching) &&
+ if ((wasRestarting || shardState->ReattachState.ReattachInfo.Reattaching) &&
shardState->ReattachState.ShouldReattach(TlsActivationContext->Now()))
{
LOG_N("Shard " << msg->TabletId << " delivery problem (already prepared, reattaching in "
- << shardState->ReattachState.Delay << ")");
+ << shardState->ReattachState.ReattachInfo.Delay << ")");
- Schedule(shardState->ReattachState.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId));
+ Schedule(shardState->ReattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId));
++shardState->RestartCount;
return;
}
@@ -1542,13 +1512,13 @@ private:
// Proceed with query processing
[[fallthrough]];
case TShardState::EState::Executing: {
- if ((wasRestarting || shardState->ReattachState.Reattaching) &&
+ if ((wasRestarting || shardState->ReattachState.ReattachInfo.Reattaching) &&
shardState->ReattachState.ShouldReattach(TlsActivationContext->Now()))
{
LOG_N("Shard " << msg->TabletId << " lost pipe while waiting for reply (reattaching in "
- << shardState->ReattachState.Delay << ")");
+ << shardState->ReattachState.ReattachInfo.Delay << ")");
- Schedule(shardState->ReattachState.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId));
+ Schedule(shardState->ReattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId));
++shardState->RestartCount;
return;
}
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 73d24cddca..10289ad7ad 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -175,6 +175,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
enum EEv {
EvShardRequestTimeout = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
EvResolveRequestPlanned,
+ EvReattachToShard,
};
struct TEvShardRequestTimeout : public TEventLocal<TEvShardRequestTimeout, EvShardRequestTimeout> {
@@ -187,6 +188,13 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
struct TEvResolveRequestPlanned : public TEventLocal<TEvResolveRequestPlanned, EvResolveRequestPlanned> {
};
+
+ struct TEvReattachToShard : public TEventLocal<TEvReattachToShard, EvReattachToShard> {
+ const ui64 TabletId;
+
+ explicit TEvReattachToShard(ui64 tabletId)
+ : TabletId(tabletId) {}
+ };
};
enum class EMode {
@@ -370,6 +378,9 @@ public:
hFunc(NKikimr::NEvents::TDataEvents::TEvWriteResult, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
+ hFunc(TEvDataShard::TEvProposeTransactionAttachResult, Handle);
+ hFunc(TEvPrivate::TEvReattachToShard, Handle);
+ hFunc(TEvDataShard::TEvProposeTransactionRestart, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(TEvPrivate::TEvShardRequestTimeout, Handle);
hFunc(TEvPrivate::TEvResolveRequestPlanned, Handle);
@@ -975,6 +986,15 @@ public:
CA_LOG_W("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId);
if (InconsistentTx) {
RetryShard(ev->Get()->TabletId, std::nullopt);
+ } else if ((TxManager->GetState(ev->Get()->TabletId) == IKqpTransactionManager::PREPARED
+ || TxManager->GetState(ev->Get()->TabletId) == IKqpTransactionManager::EXECUTING)
+ && TxManager->ShouldReattach(ev->Get()->TabletId, TlsActivationContext->Now())) {
+ // Disconnected while waiting for other shards to prepare
+ auto& reattachState = TxManager->GetReattachState(ev->Get()->TabletId);
+ CA_LOG_N("Shard " << ev->Get()->TabletId << " delivery problem (reattaching in "
+ << reattachState.ReattachInfo.Delay << ")");
+
+ Schedule(reattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(ev->Get()->TabletId));
} else {
TxManager->SetError(ev->Get()->TabletId);
if (Mode == EMode::IMMEDIATE_COMMIT) {
@@ -995,6 +1015,89 @@ public:
}
}
+ void Handle(TEvDataShard::TEvProposeTransactionAttachResult::TPtr& ev) {
+ const auto& record = ev->Get()->Record;
+ const ui64 shardId = record.GetTabletId();
+
+ auto& reattachState = TxManager->GetReattachState(shardId);
+ if (reattachState.Cookie != ev->Cookie) {
+ return;
+ }
+
+ const auto shardState = TxManager->GetState(shardId);
+ switch (shardState) {
+ case IKqpTransactionManager::EXECUTING:
+ YQL_ENSURE(Mode == EMode::COMMIT || Mode == EMode::IMMEDIATE_COMMIT);
+ case IKqpTransactionManager::PREPARED:
+ YQL_ENSURE(Mode == EMode::PREPARE);
+ case IKqpTransactionManager::PREPARING:
+ case IKqpTransactionManager::FINISHED:
+ case IKqpTransactionManager::ERROR:
+ case IKqpTransactionManager::PROCESSING:
+ YQL_ENSURE(false);
+ }
+
+ if (record.GetStatus() == NKikimrProto::OK) {
+ // Transaction still exists at this shard
+ CA_LOG_D("Reattached to shard " << shardId);
+ TxManager->Reattached(shardId);
+ return;
+ }
+
+ if (Mode == EMode::PREPARE) {
+ RuntimeError(
+ NYql::NDqProto::StatusIds::UNAVAILABLE,
+ NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
+ TStringBuilder()
+ << "ShardId=" << shardId
+ << " for table '" << TablePath
+ << "': attach transaction failed.");
+ } else {
+ RuntimeError(
+ NYql::NDqProto::StatusIds::UNAVAILABLE,
+ NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN,
+ TStringBuilder()
+ << "ShardId=" << shardId
+ << " for table '" << TablePath
+ << "': attach transaction failed.");
+ }
+ }
+
+ void Handle(TEvDataShard::TEvProposeTransactionRestart::TPtr& ev) {
+ const auto& record = ev->Get()->Record;
+ const ui64 shardId = record.GetTabletId();
+
+ CA_LOG_D("Got transaction restart event from tabletId: " << shardId);
+
+ switch (TxManager->GetState(shardId)) {
+ case IKqpTransactionManager::EXECUTING: {
+ TxManager->SetRestarting(shardId);
+ return;
+ }
+ case IKqpTransactionManager::FINISHED:
+ case IKqpTransactionManager::ERROR: {
+ return;
+ }
+ case IKqpTransactionManager::PREPARING:
+ case IKqpTransactionManager::PREPARED:
+ case IKqpTransactionManager::PROCESSING: {
+ YQL_ENSURE(false);
+ }
+ }
+ }
+
+ void Handle(TEvPrivate::TEvReattachToShard::TPtr& ev) {
+ const ui64 tabletId = ev->Get()->TabletId;
+ auto& state = TxManager->GetReattachState(tabletId);
+
+ CA_LOG_D("Reattach to shard " << tabletId);
+
+ YQL_ENSURE(TxId);
+ Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(
+ new TEvDataShard::TEvProposeTransactionAttach(tabletId, *TxId),
+ tabletId, /* subscribe */ true), 0, ++state.Cookie);
+ }
+
void Prepare() {
TableWriteActorStateSpan.EndOk();
ResolveAttempts = 0;