diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-01-12 20:24:39 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-01-12 20:24:39 +0300 |
commit | 8779e0af5f7fe453a9c4e6cc1eea9ca0c7353352 (patch) | |
tree | a1a4bcfef52639fbdec41ff509810b1e94b68aee | |
parent | 5204dcae729d33a7269a03a55b07b81f9a5910f5 (diff) | |
download | ydb-8779e0af5f7fe453a9c4e6cc1eea9ca0c7353352.tar.gz |
Abort immediate tx in case of client lost:
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 52 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 14 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_query_ut.cpp | 13 |
3 files changed, 65 insertions, 14 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index f4dc9c903b..eff1912d7c 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -155,13 +155,15 @@ public: for (const auto& x : ShardStates) { if (x.second.State != TShardState::EState::Finished) { notFinished++; - LOG_D("Datashard " << x.first << " not finished yet: " << ToString(x.second.State)); + LOG_D("ActorState: " << CurrentStateFuncName() + << ", datashard " << x.first << " not finished yet: " << ToString(x.second.State)); } } for (const auto& x : TopicTabletStates) { if (x.second.State != TShardState::EState::Finished) { ++notFinished; - LOG_D("TopicTablet " << x.first << " not finished yet: " << ToString(x.second.State)); + LOG_D("ActorState: " << CurrentStateFuncName() + << ", topicTablet " << x.first << " not finished yet: " << ToString(x.second.State)); } } if (notFinished == 0 && TBase::CheckExecutionComplete()) { @@ -169,7 +171,8 @@ public: } if (IsDebugLogEnabled()) { - auto sb = TStringBuilder() << "Waiting for " << PendingComputeActors.size() << " compute actor(s) and " + auto sb = TStringBuilder() << "ActorState: " << CurrentStateFuncName() + << ", waiting for " << PendingComputeActors.size() << " compute actor(s) and " << notFinished << " datashard(s): "; for (const auto& shardId : PendingComputeActors) { sb << "CA " << shardId.first << ", "; @@ -270,6 +273,21 @@ public: } private: + TString CurrentStateFuncName() const override { + const auto& func = CurrentStateFunc(); + if (func == &TThis::PrepareState) { + return "PrepareState"; + } else if (func == &TThis::ExecuteState) { + return "ExecuteState"; + } else if (func == &TThis::WaitSnapshotState) { + return "WaitSnapshotState"; + } else if (func == &TThis::WaitResolveState) { + return "WaitResolveState"; + } else { + return TBase::CurrentStateFuncName(); + } + } + STATEFN(PrepareState) { try { switch (ev->GetTypeRewrite()) { @@ -459,8 +477,13 @@ private: void CancelProposal(ui64 exceptShardId) { for (auto& [shardId, state] : ShardStates) { if (shardId != exceptShardId && - (state.State == TShardState::EState::Preparing || state.State == TShardState::EState::Prepared)) + (state.State == TShardState::EState::Preparing + || state.State == TShardState::EState::Prepared + || (state.State == TShardState::EState::Executing && ImmediateTx))) { + ui64 id = shardId; + LOG_D("Send CancelTransactionProposal to shard: " << id); + state.State = TShardState::EState::Finished; YQL_ENSURE(!state.DatashardState->Follower); @@ -746,7 +769,7 @@ private: hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute); hFunc(TEvDqCompute::TEvState, HandleComputeStats); hFunc(TEvDqCompute::TEvChannelData, HandleExecute); - hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); + hFunc(TEvKqp::TEvAbortExecution, HandleExecute); hFunc(TEvents::TEvWakeup, HandleTimeout); IgnoreFunc(TEvInterconnect::TEvNodeConnected); default: @@ -786,6 +809,13 @@ private: } } + void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) { + if (ImmediateTx) { + CancelProposal(0); + } + TBase::HandleAbortExecution(ev); + } + void HandleExecute(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) { TEvDataShard::TEvProposeTransactionResult* res = ev->Get(); const ui64 shardId = res->GetOrigin(); @@ -1216,7 +1246,8 @@ private: for (auto& shardTask : shardTasks) { auto& task = TasksGraph.GetTask(shardTask.second); - LOG_D("Stage " << stageInfo.Id << " create datashard task: " << shardTask.second + LOG_D("ActorState: " << CurrentStateFuncName() + << ", stage: " << stageInfo.Id << " create datashard task: " << shardTask.second << ", shard: " << shardTask.first << ", meta: " << task.Meta.ToString(keyTypes, *AppData()->TypeRegistry)); } @@ -1318,7 +1349,8 @@ private: auto locksCount = dataTransaction.GetKqpTransaction().GetLocks().LocksSize(); shardState.DatashardState->ShardReadLocks = locksCount > 0; - LOG_D("Executing KQP transaction on shard: " << shardId + LOG_D("State: " << CurrentStateFuncName() + << ", Executing KQP transaction on shard: " << shardId << ", tasks: [" << JoinStrings(shardState.TaskIds.begin(), shardState.TaskIds.end(), ",") << "]" << ", lockTxId: " << lockTxId << ", locks: " << dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString()); @@ -1765,14 +1797,16 @@ private: ExecuteTasks(); if (ImmediateTx) { - LOG_T("Immediate tx, become ExecuteState"); + LOG_D("ActorState: " << CurrentStateFuncName() + << ", immediate tx, become ExecuteState"); Become(&TKqpDataExecuter::ExecuteState); if (ExecuterStateSpan) { ExecuterStateSpan.End(); ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END); } } else { - LOG_T("Not immediate tx, become PrepareState"); + LOG_D("ActorState: " << CurrentStateFuncName() + << ", not immediate tx, become PrepareState"); Become(&TKqpDataExecuter::PrepareState); if (ExecuterStateSpan) { ExecuterStateSpan.End(); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 08edda9e27..388d794d0f 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -181,7 +181,8 @@ protected: auto& state = ev->Get()->Record; ui64 taskId = state.GetTaskId(); - LOG_D("Got execution state from compute actor: " << computeActor + LOG_D("ActorState: " << CurrentStateFuncName() + << ", got execution state from compute actor: " << computeActor << ", task: " << taskId << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()) << ", stats: " << state.GetStats()); @@ -1015,6 +1016,17 @@ protected: } protected: + virtual TString CurrentStateFuncName() const { + const auto& func = this->CurrentStateFunc(); + if (func == &TKqpExecuterBase::ZombieState) { + return "ZombieState"; + } else if (func == &TKqpExecuterBase::ReadyState) { + return "ReadyState"; + } else { + return "unknown state"; + } + } + TString DebugString() const { TStringBuilder sb; sb << "[KqpExecuter], type: " << (ExecType == EExecType::Data ? "Data" : "Scan") diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index 91c1f4b7ba..5de239abee 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -21,7 +21,6 @@ static void CheckStatusAfterTimeout(TSession& session, const TString& query, con const TInstant start = TInstant::Now(); while (true) { auto result = session.ExecuteDataQuery(query, txControl).ExtractValueSync(); - Cerr << "STATUS: " << result.GetStatus() << " " << result.GetIssues().ToString() << Endl; UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString()); if (result.GetStatus() == EStatus::SUCCESS) { break; @@ -300,8 +299,14 @@ Y_UNIT_TEST_SUITE(KqpQuery) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, true, result.GetIssues().ToString()); } - Y_UNIT_TEST(QueryClientTimeout) { - TKikimrRunner kikimr; + Y_UNIT_TEST_TWIN(QueryClientTimeout, WithMvcc) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(WithMvcc) + .SetEnableMvccSnapshotReads(WithMvcc) + .SetEnableKqpImmediateEffects(WithMvcc); + + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -313,7 +318,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { auto txControl = TTxControl::BeginTx().CommitTx(); - NDataShard::gSkipRepliesFailPoint.Enable(-1, -1, 1); + NDataShard::gSkipRepliesFailPoint.Enable(-1, -1, 2); auto result = session.ExecuteDataQuery( query, |