aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-05-23 11:59:08 +0300
committerGitHub <noreply@github.com>2025-05-23 11:59:08 +0300
commit7f7198516baf2d96e715bb4d4d63cdcf68e58769 (patch)
tree813af5897866a5917dbeea883a38d16d7da0b6f7
parent31cd97f267042bebbf3fbbae52d5c9336d5ca958 (diff)
downloadydb-7f7198516baf2d96e715bb4d4d63cdcf68e58769.tar.gz
Fix lost commit without write (EvWrite) (#18634)
-rw-r--r--ydb/core/kqp/common/kqp_tx_manager.cpp5
-rw-r--r--ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp80
2 files changed, 84 insertions, 1 deletions
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp
index db1a63cd27e..bba5fc63987 100644
--- a/ydb/core/kqp/common/kqp_tx_manager.cpp
+++ b/ydb/core/kqp/common/kqp_tx_manager.cpp
@@ -51,6 +51,7 @@ public:
if (action & EAction::WRITE) {
ReadOnly = false;
}
+ ++ActionsCount;
}
void AddTopic(ui64 topicId, const TString& path) override {
@@ -310,7 +311,8 @@ public:
}
bool NeedCommit() const override {
- const bool dontNeedCommit = IsEmpty() || IsReadOnly() && (IsSingleShard() || HasSnapshot());
+ AFL_ENSURE(ActionsCount != 1 || IsSingleShard()); // ActionsCount == 1 then IsSingleShard()
+ const bool dontNeedCommit = IsEmpty() || IsReadOnly() && ((ActionsCount == 1) || HasSnapshot());
return !dontNeedCommit;
}
@@ -515,6 +517,7 @@ private:
THashSet<ui64> ShardsIds;
THashMap<ui64, TShardInfo> ShardsInfo;
std::unordered_set<TString> TablePathes;
+ ui64 ActionsCount = 0;
THashSet<ui32> ParticipantNodes;
diff --git a/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp b/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp
index f3328caad1b..92c8cc06bc0 100644
--- a/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp
+++ b/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/gateway/kqp_metadata_loader.h>
#include <ydb/core/kqp/host/kqp_host_impl.h>
+#include <ydb/core/tx/data_events/events.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h>
@@ -246,5 +247,84 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}
+
+ Y_UNIT_TEST(TestNoWrite) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
+
+ auto setting = NKikimrKqp::TKqpSetting();
+ TKikimrSettings settings;
+ settings.SetAppConfig(appConfig);
+ settings.SetUseRealThreads(false);
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
+ auto deleteSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
+
+ auto& runtime = *kikimr.GetTestServer().GetRuntime();
+
+ kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; });
+
+ {
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+
+ const TString query(Q1_(R"(
+ SELECT * FROM `/Root/KeyValue` WHERE Key = 3u;
+
+ UPDATE `/Root/KeyValue` SET Value = "Test" WHERE Key = 3u AND Value = "Not exists";
+ )"));
+
+ std::vector<std::unique_ptr<IEventHandle>> writes;
+ bool blockWrites = true;
+
+ auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
+ if (blockWrites && ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) {
+ auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
+ UNIT_ASSERT(evWrite->Record.OperationsSize() == 0);
+ UNIT_ASSERT(evWrite->Record.GetLocks().GetLocks().size() != 0);
+ writes.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&writes](IEventHandle&) {
+ return writes.size() > 0;
+ });
+
+ runtime.SetObserverFunc(grab);
+
+ auto future = kikimr.RunInThreadPool([&]{
+ auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
+ return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
+ });
+
+ runtime.DispatchEvents(opts);
+ UNIT_ASSERT(writes.size() > 0);
+
+ blockWrites = false;
+
+ const TString deleteQuery(Q1_(R"(
+ DELETE FROM `/Root/KeyValue` WHERE Key = 3u;
+ )"));
+
+ auto deleteResult = kikimr.RunCall([&]{
+ auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
+ return deleteSession.ExecuteDataQuery(deleteQuery, txc, execSettings).ExtractValueSync();
+ });
+
+ UNIT_ASSERT(deleteResult.IsSuccess());
+
+ for(auto& ev: writes) {
+ runtime.Send(ev.release());
+ }
+
+ auto result = runtime.WaitFuture(future);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+ }
+ }
}
}