diff options
author | snaury <snaury@ydb.tech> | 2022-11-23 13:32:27 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-11-23 13:32:27 +0300 |
commit | 5f627a5e2c2917fd26b1fc318b9d8a8918c8fc74 (patch) | |
tree | fff3c50b9ed022afc185e5f9575944d77ec9704d | |
parent | 90ab819c45e25b684b4bb46ba3aa8860e0972bb1 (diff) | |
download | ydb-5f627a5e2c2917fd26b1fc318b9d8a8918c8fc74.tar.gz |
Support generic readsets in datashards
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 37 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 3 | ||||
-rw-r--r-- | ydb/core/testlib/basics/feature_flags.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 104 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_rs.cpp | 120 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 2 |
9 files changed, 244 insertions, 30 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 9b288e0096a..a1dedbbe631 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -741,8 +741,13 @@ private: case NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN: { LOG_D("Broken locks: " << res->Record.DebugString()); + YQL_ENSURE(shardState->State == TShardState::EState::Executing); + shardState->State = TShardState::EState::Finished; + Counters->TxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter? + LocksBroken = true; + TMaybe<TString> tableName; if (!res->Record.GetTxLocks().empty()) { auto& lock = res->Record.GetTxLocks(0); @@ -753,13 +758,18 @@ private: } } - auto message = TStringBuilder() << "Transaction locks invalidated."; + // Reply as soon as we know which table had locks invalidated if (tableName) { - message << " Table: " << *tableName; + auto message = TStringBuilder() + << "Transaction locks invalidated. Table: " << *tableName; + + return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, + YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message)); } - return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, - YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message)); + // Receive more replies from other shards + CheckExecutionComplete(); + return; } case NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED: { YQL_ENSURE(false); @@ -1754,6 +1764,8 @@ private: Request.TopicOperations.BuildTopicTxs(topicTxs); + const bool useGenericReadSets = AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() || !topicTxs.empty(); + if (auto locksMap = ExtractLocks(Request.Locks); !locksMap.empty() || Request.TopicOperations.HasReadOperations()) { YQL_ENSURE(Request.ValidateLocks || Request.EraseLocks); @@ -1783,7 +1795,7 @@ private: tx.MutableLocks()->MutableLocks()->Add()->Swap(&lock); } - if ((!locksList.empty() || Request.TopicOperations.HasReadOperations()) && Request.ValidateLocks) { + if (!locksList.empty() && Request.ValidateLocks) { locksSendingShards.insert(shardId); } } @@ -1821,6 +1833,13 @@ private: } } + if (useGenericReadSets) { + // Make sure datashards use generic readsets + for (auto& pr : datashardTxs) { + pr.second.SetUseGenericReadSets(true); + } + } + return datashardTxs; } @@ -1962,6 +1981,13 @@ private: } void Finalize() { + if (LocksBroken) { + TString message = "Transaction locks invalidated."; + + return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, + YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message)); + } + auto& response = *ResponseEv->Record.MutableResponse(); response.SetStatus(Ydb::StatusIds::SUCCESS); @@ -2151,6 +2177,7 @@ private: bool ImmediateTx = false; bool UseFollowers = false; bool TxPlanned = false; + bool LocksBroken = false; TInstant FirstPrepareReply; TInstant LastPrepareReply; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index ad2cf7f7643..17a6a8d0f41 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -748,6 +748,7 @@ message TFeatureFlags { optional bool EnableKqpScanQuerySourceRead = 78 [default = false]; optional bool EnableDynamicNodeAuthorization = 79 [default = false]; optional bool EnableKqpImmediateEffects = 80 [default = false]; + optional bool EnableDataShardGenericReadSets = 81 [default = false]; } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index e4adad9f190..19d8bcb15af 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -236,6 +236,9 @@ message TKqpTransaction { reserved 5; optional NKikimrKqp.TKqpSnapshot Snapshot = 6; + + // When true datashard will exchange a generic TReadSetData with other shards + optional bool UseGenericReadSets = 7 [default = false]; } message TKqpReadRangesSourceSettings { diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 85a086c269a..9b7c6bd89e8 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -42,6 +42,7 @@ public: FEATURE_FLAG_SETTER(EnableGrpcAudit) FEATURE_FLAG_SETTER(EnableChangefeedInitialScan) FEATURE_FLAG_SETTER(EnableKqpImmediateEffects) + FEATURE_FLAG_SETTER(EnableDataShardGenericReadSets) TDerived& SetEnableMvcc(std::optional<bool> value) { if (value) { diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp index e0c1245254e..585109ef45e 100644 --- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp @@ -97,7 +97,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac dataTx->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion); if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) { - auto result = KqpRunTransaction(ctx, op->GetTxId(), dataTx->GetKqpTasks(), tasksRunner); + auto result = KqpRunTransaction(ctx, op->GetTxId(), kqpTx, tasksRunner); Y_VERIFY_S(!dataTx->GetKqpComputeCtx().HadInconsistentReads(), "Unexpected inconsistent reads in operation " << *op << " when preparing persistent channels"); diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 6085dd59376..c0188834c50 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -87,11 +87,11 @@ NUdf::EFetchStatus FetchAllOutput(NDq::IDqOutputChannel* channel, NDqProto::TDat } NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId, - const TInputOpData::TInReadSets* inReadSets, const google::protobuf::RepeatedPtrField<NDqProto::TDqTask>& tasks, + const TInputOpData::TInReadSets* inReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner, bool applyEffects) { THashMap<ui64, std::pair<ui64, ui32>> inputChannelsMap; // channelId -> (taskId, input index) - for (auto& task : tasks) { + for (auto& task : kqpTx.GetTasks()) { for (ui32 i = 0; i < task.InputsSize(); ++i) { auto& input = task.GetInputs(i); for (auto& channel : input.GetChannels()) { @@ -114,7 +114,20 @@ NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId, for (auto& data : dataList) { NKikimrTxDataShard::TKqpReadset kqpReadset; - Y_PROTOBUF_SUPPRESS_NODISCARD kqpReadset.ParseFromString(data.Body); + if (kqpTx.GetUseGenericReadSets()) { + NKikimrTx::TReadSetData genericData; + bool ok = genericData.ParseFromString(data.Body); + Y_VERIFY(ok, "Failed to parse generic readset data from %" PRIu64 " to %" PRIu64 " origin %" PRIu64, + source, target, data.Origin); + + if (genericData.HasData()) { + ok = genericData.GetData().UnpackTo(&kqpReadset); + Y_VERIFY(ok, "Failed to parse kqp readset data from %" PRIu64 " to %" PRIu64 " origin %" PRIu64, + source, target, data.Origin); + } + } else { + Y_PROTOBUF_SUPPRESS_NODISCARD kqpReadset.ParseFromString(data.Body); + } for (int outputId = 0; outputId < kqpReadset.GetOutputs().size(); ++outputId) { auto* channelData = kqpReadset.MutableOutputs()->Mutable(outputId); @@ -151,7 +164,7 @@ NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId, MKQL_ENSURE_S(runStatus == NDq::ERunStatus::PendingInput); hasInputChanges = false; - for (auto& task : tasks) { + for (auto& task : kqpTx.GetTasks()) { for (ui32 i = 0; i < task.OutputsSize(); ++i) { for (auto& channel : task.GetOutputs(i).GetChannels()) { if (auto* inputInfo = inputChannelsMap.FindPtr(channel.GetId())) { @@ -476,17 +489,17 @@ void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLoc } NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId, - const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner) + const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner) { - return RunKqpTransactionInternal(ctx, txId, /* inReadSets */ nullptr, tasks, tasksRunner, /* applyEffects */ false); + return RunKqpTransactionInternal(ctx, txId, /* inReadSets */ nullptr, kqpTx, tasksRunner, /* applyEffects */ false); } THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const TActorContext& ctx, ui64 origin, ui64 txId, const TInputOpData::TInReadSets* inReadSets, - const google::protobuf::RepeatedPtrField<NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner, + const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner, const NMiniKQL::TKqpDatashardComputeContext& computeCtx) { - auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, tasks, tasksRunner, /* applyEffects */ true); + auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, kqpTx, tasksRunner, /* applyEffects */ true); if (computeCtx.HadInconsistentReads()) { return nullptr; @@ -501,7 +514,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA, origin, txId, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); - for (auto& task : tasks) { + for (auto& task : kqpTx.GetTasks()) { auto& taskRunner = tasksRunner.GetTaskRunner(task.GetId()); for (ui32 i = 0; i < task.OutputsSize(); ++i) { @@ -576,6 +589,9 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT } } + NKikimrTx::TReadSetData::EDecision decision = NKikimrTx::TReadSetData::DECISION_COMMIT; + TMap<std::pair<ui64, ui64>, NKikimrTx::TReadSetData> genericData; + if (kqpTx.HasLocks() && NeedValidateLocks(kqpTx.GetLocks().GetOp())) { bool sendLocks = SendLocks(kqpTx.GetLocks(), tabletId); YQL_ENSURE(sendLocks == !kqpTx.GetLocks().GetLocks().empty()); @@ -589,7 +605,11 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT for (auto& lock : brokenLocks) { LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Found broken lock: " << lock.ShortDebugString()); - validateLocksResult.AddBrokenLocks()->Swap(&lock); + if (kqpTx.GetUseGenericReadSets()) { + decision = NKikimrTx::TReadSetData::DECISION_ABORT; + } else { + validateLocksResult.AddBrokenLocks()->Swap(&lock); + } } for (auto& dstTabletId : kqpTx.GetLocks().GetReceivingShards()) { @@ -601,16 +621,39 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT << tabletId << " to " << dstTabletId << ", locks: " << validateLocksResult.ShortDebugString()); auto key = std::make_pair(tabletId, dstTabletId); - readsetData[key].MutableValidateLocksResult()->CopyFrom(validateLocksResult); + if (kqpTx.GetUseGenericReadSets()) { + genericData[key].SetDecision(decision); + } else { + readsetData[key].MutableValidateLocksResult()->CopyFrom(validateLocksResult); + } } } } - for (auto& [key, data] : readsetData) { - TString bodyStr; - Y_PROTOBUF_SUPPRESS_NODISCARD data.SerializeToString(&bodyStr); + if (kqpTx.GetUseGenericReadSets()) { + for (const auto& [key, data] : readsetData) { + bool ok = genericData[key].MutableData()->PackFrom(data); + Y_VERIFY(ok, "Failed to pack readset data from %" PRIu64 " to %" PRIu64, key.first, key.second); + } + + for (auto& [key, data] : genericData) { + if (!data.HasDecision()) { + data.SetDecision(decision); + } - outReadSets[key] = bodyStr; + TString bodyStr; + bool ok = data.SerializeToString(&bodyStr); + Y_VERIFY(ok, "Failed to serialize readset from %" PRIu64 " to %" PRIu64, key.first, key.second); + + outReadSets[key] = std::move(bodyStr); + } + } else { + for (auto& [key, data] : readsetData) { + TString bodyStr; + Y_PROTOBUF_SUPPRESS_NODISCARD data.SerializeToString(&bodyStr); + + outReadSets[key] = bodyStr; + } } } @@ -645,22 +688,41 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) for (auto& readSet : tx->InReadSets()) { for (auto& data : readSet.second) { - NKikimrTxDataShard::TKqpReadset kqpReadset; - Y_PROTOBUF_SUPPRESS_NODISCARD kqpReadset.ParseFromString(data.Body); + if (kqpTx.GetUseGenericReadSets()) { + NKikimrTx::TReadSetData genericData; + bool ok = genericData.ParseFromString(data.Body); + Y_VERIFY(ok, "Failed to parse generic readset from %" PRIu64 " to %" PRIu64 " origin %" PRIu64, + readSet.first.first, readSet.first.second, data.Origin); - if (kqpReadset.HasValidateLocksResult()) { - auto& validateResult = kqpReadset.GetValidateLocksResult(); - if (!validateResult.GetSuccess()) { + if (genericData.GetDecision() != NKikimrTx::TReadSetData::DECISION_COMMIT) { tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>( NKikimrTxDataShard::TX_KIND_DATA, origin, tx->GetTxId(), NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN); - tx->Result()->Record.MutableTxLocks()->CopyFrom(validateResult.GetBrokenLocks()); + // Note: we don't know details on what failed at that shard return false; } + } else { + NKikimrTxDataShard::TKqpReadset kqpReadset; + Y_PROTOBUF_SUPPRESS_NODISCARD kqpReadset.ParseFromString(data.Body); + + if (kqpReadset.HasValidateLocksResult()) { + auto& validateResult = kqpReadset.GetValidateLocksResult(); + if (!validateResult.GetSuccess()) { + tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>( + NKikimrTxDataShard::TX_KIND_DATA, + origin, + tx->GetTxId(), + NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN); + + tx->Result()->Record.MutableTxLocks()->CopyFrom(validateResult.GetBrokenLocks()); + + return false; + } + } } } } diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h index 370266cba60..52086ea0f70 100644 --- a/ydb/core/tx/datashard/datashard_kqp.h +++ b/ydb/core/tx/datashard/datashard_kqp.h @@ -21,11 +21,11 @@ void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo, void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLocks& sysLocks, TEngineBay& engineBay); NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId, - const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner); + const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner); THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const TActorContext& ctx, ui64 origin, ui64 txId, const TInputOpData::TInReadSets* inReadSets, - const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner, + const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner, const NMiniKQL::TKqpDatashardComputeContext& computeCtx); void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx, diff --git a/ydb/core/tx/datashard/datashard_ut_rs.cpp b/ydb/core/tx/datashard/datashard_ut_rs.cpp index 6442648fdce..b71bea89218 100644 --- a/ydb/core/tx/datashard/datashard_ut_rs.cpp +++ b/ydb/core/tx/datashard/datashard_ut_rs.cpp @@ -1,5 +1,6 @@ #include "defs.h" #include "datashard_ut_common.h" +#include "datashard_ut_common_kqp.h" #include <ydb/core/testlib/test_client.h> #include <ydb/core/tx/schemeshard/schemeshard.h> @@ -10,6 +11,7 @@ namespace NKikimr { +using namespace NKikimr::NDataShard::NKqpHelpers; using namespace NSchemeShard; using namespace Tests; @@ -324,6 +326,124 @@ Y_UNIT_TEST_SUITE(TDataShardRSTest) { runtime.Register(CreateTabletKiller(shard1)); WaitTabletBecomesOffline(server, shard2); } + + Y_UNIT_TEST(TestGenericReadSetDecisionCommit) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableDataShardGenericReadSets(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"); + + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); + + size_t readSets = 0; + auto observeReadSets = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvReadSet::EventType: { + auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + NKikimrTx::TReadSetData genericData; + Y_VERIFY(genericData.ParseFromString(msg->Record.GetReadSet())); + Cerr << "... generic readset: " << genericData.DebugString() << Endl; + UNIT_ASSERT(genericData.HasDecision()); + UNIT_ASSERT(genericData.GetDecision() == NKikimrTx::TReadSetData::DECISION_COMMIT); + UNIT_ASSERT(!genericData.HasData()); + UNIT_ASSERT(genericData.unknown_fields().empty()); + ++readSets; + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserver = runtime.SetObserverFunc(observeReadSets); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, R"( + UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2) + )"), + "<empty>"); + UNIT_ASSERT(readSets > 0); + } + + Y_UNIT_TEST(TestGenericReadSetDecisionAbort) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableDataShardGenericReadSets(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"); + + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)"); + + size_t readSets = 0; + auto observeReadSets = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvReadSet::EventType: { + auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + NKikimrTx::TReadSetData genericData; + Y_VERIFY(genericData.ParseFromString(msg->Record.GetReadSet())); + Cerr << "... generic readset: " << genericData.DebugString() << Endl; + UNIT_ASSERT(genericData.HasDecision()); + UNIT_ASSERT(genericData.GetDecision() == NKikimrTx::TReadSetData::DECISION_ABORT); + UNIT_ASSERT(!genericData.HasData()); + UNIT_ASSERT(genericData.unknown_fields().empty()); + ++readSets; + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserver = runtime.SetObserverFunc(observeReadSets); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, R"( + UPSERT INTO `/Root/table-2` (key, value) VALUES (3, 3) + )"), + "ERROR: ABORTED"); + UNIT_ASSERT(readSets > 0); + } } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 304d94b17f7..d211051c9ab 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -204,7 +204,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx(); auto result = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(), - op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), dataTx->GetKqpTasks(), tasksRunner, computeCtx); + op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), kqpTx, tasksRunner, computeCtx); if (!result && computeCtx.HadInconsistentReads()) { LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId |