aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-01-12 20:24:39 +0300
committerdcherednik <dcherednik@ydb.tech>2023-01-12 20:24:39 +0300
commit8779e0af5f7fe453a9c4e6cc1eea9ca0c7353352 (patch)
treea1a4bcfef52639fbdec41ff509810b1e94b68aee
parent5204dcae729d33a7269a03a55b07b81f9a5910f5 (diff)
downloadydb-8779e0af5f7fe453a9c4e6cc1eea9ca0c7353352.tar.gz
Abort immediate tx in case of client lost:
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp52
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h14
-rw-r--r--ydb/core/kqp/ut/query/kqp_query_ut.cpp13
3 files changed, 65 insertions, 14 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index f4dc9c903b..eff1912d7c 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -155,13 +155,15 @@ public:
for (const auto& x : ShardStates) {
if (x.second.State != TShardState::EState::Finished) {
notFinished++;
- LOG_D("Datashard " << x.first << " not finished yet: " << ToString(x.second.State));
+ LOG_D("ActorState: " << CurrentStateFuncName()
+ << ", datashard " << x.first << " not finished yet: " << ToString(x.second.State));
}
}
for (const auto& x : TopicTabletStates) {
if (x.second.State != TShardState::EState::Finished) {
++notFinished;
- LOG_D("TopicTablet " << x.first << " not finished yet: " << ToString(x.second.State));
+ LOG_D("ActorState: " << CurrentStateFuncName()
+ << ", topicTablet " << x.first << " not finished yet: " << ToString(x.second.State));
}
}
if (notFinished == 0 && TBase::CheckExecutionComplete()) {
@@ -169,7 +171,8 @@ public:
}
if (IsDebugLogEnabled()) {
- auto sb = TStringBuilder() << "Waiting for " << PendingComputeActors.size() << " compute actor(s) and "
+ auto sb = TStringBuilder() << "ActorState: " << CurrentStateFuncName()
+ << ", waiting for " << PendingComputeActors.size() << " compute actor(s) and "
<< notFinished << " datashard(s): ";
for (const auto& shardId : PendingComputeActors) {
sb << "CA " << shardId.first << ", ";
@@ -270,6 +273,21 @@ public:
}
private:
+ TString CurrentStateFuncName() const override {
+ const auto& func = CurrentStateFunc();
+ if (func == &TThis::PrepareState) {
+ return "PrepareState";
+ } else if (func == &TThis::ExecuteState) {
+ return "ExecuteState";
+ } else if (func == &TThis::WaitSnapshotState) {
+ return "WaitSnapshotState";
+ } else if (func == &TThis::WaitResolveState) {
+ return "WaitResolveState";
+ } else {
+ return TBase::CurrentStateFuncName();
+ }
+ }
+
STATEFN(PrepareState) {
try {
switch (ev->GetTypeRewrite()) {
@@ -459,8 +477,13 @@ private:
void CancelProposal(ui64 exceptShardId) {
for (auto& [shardId, state] : ShardStates) {
if (shardId != exceptShardId &&
- (state.State == TShardState::EState::Preparing || state.State == TShardState::EState::Prepared))
+ (state.State == TShardState::EState::Preparing
+ || state.State == TShardState::EState::Prepared
+ || (state.State == TShardState::EState::Executing && ImmediateTx)))
{
+ ui64 id = shardId;
+ LOG_D("Send CancelTransactionProposal to shard: " << id);
+
state.State = TShardState::EState::Finished;
YQL_ENSURE(!state.DatashardState->Follower);
@@ -746,7 +769,7 @@ private:
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvChannelData, HandleExecute);
- hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
+ hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
hFunc(TEvents::TEvWakeup, HandleTimeout);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
default:
@@ -786,6 +809,13 @@ private:
}
}
+ void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) {
+ if (ImmediateTx) {
+ CancelProposal(0);
+ }
+ TBase::HandleAbortExecution(ev);
+ }
+
void HandleExecute(TEvDataShard::TEvProposeTransactionResult::TPtr& ev) {
TEvDataShard::TEvProposeTransactionResult* res = ev->Get();
const ui64 shardId = res->GetOrigin();
@@ -1216,7 +1246,8 @@ private:
for (auto& shardTask : shardTasks) {
auto& task = TasksGraph.GetTask(shardTask.second);
- LOG_D("Stage " << stageInfo.Id << " create datashard task: " << shardTask.second
+ LOG_D("ActorState: " << CurrentStateFuncName()
+ << ", stage: " << stageInfo.Id << " create datashard task: " << shardTask.second
<< ", shard: " << shardTask.first
<< ", meta: " << task.Meta.ToString(keyTypes, *AppData()->TypeRegistry));
}
@@ -1318,7 +1349,8 @@ private:
auto locksCount = dataTransaction.GetKqpTransaction().GetLocks().LocksSize();
shardState.DatashardState->ShardReadLocks = locksCount > 0;
- LOG_D("Executing KQP transaction on shard: " << shardId
+ LOG_D("State: " << CurrentStateFuncName()
+ << ", Executing KQP transaction on shard: " << shardId
<< ", tasks: [" << JoinStrings(shardState.TaskIds.begin(), shardState.TaskIds.end(), ",") << "]"
<< ", lockTxId: " << lockTxId
<< ", locks: " << dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString());
@@ -1765,14 +1797,16 @@ private:
ExecuteTasks();
if (ImmediateTx) {
- LOG_T("Immediate tx, become ExecuteState");
+ LOG_D("ActorState: " << CurrentStateFuncName()
+ << ", immediate tx, become ExecuteState");
Become(&TKqpDataExecuter::ExecuteState);
if (ExecuterStateSpan) {
ExecuterStateSpan.End();
ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterExecuteState, ExecuterSpan.GetTraceId(), "ExecuteState", NWilson::EFlags::AUTO_END);
}
} else {
- LOG_T("Not immediate tx, become PrepareState");
+ LOG_D("ActorState: " << CurrentStateFuncName()
+ << ", not immediate tx, become PrepareState");
Become(&TKqpDataExecuter::PrepareState);
if (ExecuterStateSpan) {
ExecuterStateSpan.End();
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 08edda9e27..388d794d0f 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -181,7 +181,8 @@ protected:
auto& state = ev->Get()->Record;
ui64 taskId = state.GetTaskId();
- LOG_D("Got execution state from compute actor: " << computeActor
+ LOG_D("ActorState: " << CurrentStateFuncName()
+ << ", got execution state from compute actor: " << computeActor
<< ", task: " << taskId
<< ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
<< ", stats: " << state.GetStats());
@@ -1015,6 +1016,17 @@ protected:
}
protected:
+ virtual TString CurrentStateFuncName() const {
+ const auto& func = this->CurrentStateFunc();
+ if (func == &TKqpExecuterBase::ZombieState) {
+ return "ZombieState";
+ } else if (func == &TKqpExecuterBase::ReadyState) {
+ return "ReadyState";
+ } else {
+ return "unknown state";
+ }
+ }
+
TString DebugString() const {
TStringBuilder sb;
sb << "[KqpExecuter], type: " << (ExecType == EExecType::Data ? "Data" : "Scan")
diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
index 91c1f4b7ba..5de239abee 100644
--- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
@@ -21,7 +21,6 @@ static void CheckStatusAfterTimeout(TSession& session, const TString& query, con
const TInstant start = TInstant::Now();
while (true) {
auto result = session.ExecuteDataQuery(query, txControl).ExtractValueSync();
- Cerr << "STATUS: " << result.GetStatus() << " " << result.GetIssues().ToString() << Endl;
UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, result.GetIssues().ToString());
if (result.GetStatus() == EStatus::SUCCESS) {
break;
@@ -300,8 +299,14 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SESSION_BUSY, true, result.GetIssues().ToString());
}
- Y_UNIT_TEST(QueryClientTimeout) {
- TKikimrRunner kikimr;
+ Y_UNIT_TEST_TWIN(QueryClientTimeout, WithMvcc) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(WithMvcc)
+ .SetEnableMvccSnapshotReads(WithMvcc)
+ .SetEnableKqpImmediateEffects(WithMvcc);
+
+ TKikimrRunner kikimr(serverSettings);
+
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -313,7 +318,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
auto txControl = TTxControl::BeginTx().CommitTx();
- NDataShard::gSkipRepliesFailPoint.Enable(-1, -1, 1);
+ NDataShard::gSkipRepliesFailPoint.Enable(-1, -1, 2);
auto result = session.ExecuteDataQuery(
query,