diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-05-23 11:59:08 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-23 11:59:08 +0300 |
commit | 7f7198516baf2d96e715bb4d4d63cdcf68e58769 (patch) | |
tree | 813af5897866a5917dbeea883a38d16d7da0b6f7 | |
parent | 31cd97f267042bebbf3fbbae52d5c9336d5ca958 (diff) | |
download | ydb-7f7198516baf2d96e715bb4d4d63cdcf68e58769.tar.gz |
Fix lost commit without write (EvWrite) (#18634)
-rw-r--r-- | ydb/core/kqp/common/kqp_tx_manager.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp | 80 |
2 files changed, 84 insertions, 1 deletions
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp index db1a63cd27e..bba5fc63987 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.cpp +++ b/ydb/core/kqp/common/kqp_tx_manager.cpp @@ -51,6 +51,7 @@ public: if (action & EAction::WRITE) { ReadOnly = false; } + ++ActionsCount; } void AddTopic(ui64 topicId, const TString& path) override { @@ -310,7 +311,8 @@ public: } bool NeedCommit() const override { - const bool dontNeedCommit = IsEmpty() || IsReadOnly() && (IsSingleShard() || HasSnapshot()); + AFL_ENSURE(ActionsCount != 1 || IsSingleShard()); // ActionsCount == 1 then IsSingleShard() + const bool dontNeedCommit = IsEmpty() || IsReadOnly() && ((ActionsCount == 1) || HasSnapshot()); return !dontNeedCommit; } @@ -515,6 +517,7 @@ private: THashSet<ui64> ShardsIds; THashMap<ui64, TShardInfo> ShardsInfo; std::unordered_set<TString> TablePathes; + ui64 ActionsCount = 0; THashSet<ui32> ParticipantNodes; diff --git a/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp b/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp index f3328caad1b..92c8cc06bc0 100644 --- a/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp @@ -6,6 +6,7 @@ #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/gateway/kqp_metadata_loader.h> #include <ydb/core/kqp/host/kqp_host_impl.h> +#include <ydb/core/tx/data_events/events.h> #include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h> #include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h> @@ -246,5 +247,84 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } } + + Y_UNIT_TEST(TestNoWrite) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + + auto setting = NKikimrKqp::TKqpSetting(); + TKikimrSettings settings; + settings.SetAppConfig(appConfig); + settings.SetUseRealThreads(false); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); }); + auto deleteSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); }); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + + kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; }); + + { + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + const TString query(Q1_(R"( + SELECT * FROM `/Root/KeyValue` WHERE Key = 3u; + + UPDATE `/Root/KeyValue` SET Value = "Test" WHERE Key = 3u AND Value = "Not exists"; + )")); + + std::vector<std::unique_ptr<IEventHandle>> writes; + bool blockWrites = true; + + auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto { + if (blockWrites && ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) { + auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>(); + UNIT_ASSERT(evWrite->Record.OperationsSize() == 0); + UNIT_ASSERT(evWrite->Record.GetLocks().GetLocks().size() != 0); + writes.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }; + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&writes](IEventHandle&) { + return writes.size() > 0; + }); + + runtime.SetObserverFunc(grab); + + auto future = kikimr.RunInThreadPool([&]{ + auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync(); + }); + + runtime.DispatchEvents(opts); + UNIT_ASSERT(writes.size() > 0); + + blockWrites = false; + + const TString deleteQuery(Q1_(R"( + DELETE FROM `/Root/KeyValue` WHERE Key = 3u; + )")); + + auto deleteResult = kikimr.RunCall([&]{ + auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + return deleteSession.ExecuteDataQuery(deleteQuery, txc, execSettings).ExtractValueSync(); + }); + + UNIT_ASSERT(deleteResult.IsSuccess()); + + for(auto& ev: writes) { + runtime.Send(ev.release()); + } + + auto result = runtime.WaitFuture(future); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + } + } } } |