aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-11-23 13:32:27 +0300
committersnaury <snaury@ydb.tech>2022-11-23 13:32:27 +0300
commit5f627a5e2c2917fd26b1fc318b9d8a8918c8fc74 (patch)
treefff3c50b9ed022afc185e5f9575944d77ec9704d
parent90ab819c45e25b684b4bb46ba3aa8860e0972bb1 (diff)
downloadydb-5f627a5e2c2917fd26b1fc318b9d8a8918c8fc74.tar.gz
Support generic readsets in datashards
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp37
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/tx_datashard.proto3
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
-rw-r--r--ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp104
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.h4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_rs.cpp120
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp2
9 files changed, 244 insertions, 30 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 9b288e0096a..a1dedbbe631 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -741,8 +741,13 @@ private:
case NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN: {
LOG_D("Broken locks: " << res->Record.DebugString());
+ YQL_ENSURE(shardState->State == TShardState::EState::Executing);
+ shardState->State = TShardState::EState::Finished;
+
Counters->TxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter?
+ LocksBroken = true;
+
TMaybe<TString> tableName;
if (!res->Record.GetTxLocks().empty()) {
auto& lock = res->Record.GetTxLocks(0);
@@ -753,13 +758,18 @@ private:
}
}
- auto message = TStringBuilder() << "Transaction locks invalidated.";
+ // Reply as soon as we know which table had locks invalidated
if (tableName) {
- message << " Table: " << *tableName;
+ auto message = TStringBuilder()
+ << "Transaction locks invalidated. Table: " << *tableName;
+
+ return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
+ YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
}
- return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
- YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
+ // Receive more replies from other shards
+ CheckExecutionComplete();
+ return;
}
case NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED: {
YQL_ENSURE(false);
@@ -1754,6 +1764,8 @@ private:
Request.TopicOperations.BuildTopicTxs(topicTxs);
+ const bool useGenericReadSets = AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() || !topicTxs.empty();
+
if (auto locksMap = ExtractLocks(Request.Locks); !locksMap.empty() || Request.TopicOperations.HasReadOperations()) {
YQL_ENSURE(Request.ValidateLocks || Request.EraseLocks);
@@ -1783,7 +1795,7 @@ private:
tx.MutableLocks()->MutableLocks()->Add()->Swap(&lock);
}
- if ((!locksList.empty() || Request.TopicOperations.HasReadOperations()) && Request.ValidateLocks) {
+ if (!locksList.empty() && Request.ValidateLocks) {
locksSendingShards.insert(shardId);
}
}
@@ -1821,6 +1833,13 @@ private:
}
}
+ if (useGenericReadSets) {
+ // Make sure datashards use generic readsets
+ for (auto& pr : datashardTxs) {
+ pr.second.SetUseGenericReadSets(true);
+ }
+ }
+
return datashardTxs;
}
@@ -1962,6 +1981,13 @@ private:
}
void Finalize() {
+ if (LocksBroken) {
+ TString message = "Transaction locks invalidated.";
+
+ return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
+ YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
+ }
+
auto& response = *ResponseEv->Record.MutableResponse();
response.SetStatus(Ydb::StatusIds::SUCCESS);
@@ -2151,6 +2177,7 @@ private:
bool ImmediateTx = false;
bool UseFollowers = false;
bool TxPlanned = false;
+ bool LocksBroken = false;
TInstant FirstPrepareReply;
TInstant LastPrepareReply;
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index ad2cf7f7643..17a6a8d0f41 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -748,6 +748,7 @@ message TFeatureFlags {
optional bool EnableKqpScanQuerySourceRead = 78 [default = false];
optional bool EnableDynamicNodeAuthorization = 79 [default = false];
optional bool EnableKqpImmediateEffects = 80 [default = false];
+ optional bool EnableDataShardGenericReadSets = 81 [default = false];
}
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index e4adad9f190..19d8bcb15af 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -236,6 +236,9 @@ message TKqpTransaction {
reserved 5;
optional NKikimrKqp.TKqpSnapshot Snapshot = 6;
+
+ // When true datashard will exchange a generic TReadSetData with other shards
+ optional bool UseGenericReadSets = 7 [default = false];
}
message TKqpReadRangesSourceSettings {
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index 85a086c269a..9b7c6bd89e8 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -42,6 +42,7 @@ public:
FEATURE_FLAG_SETTER(EnableGrpcAudit)
FEATURE_FLAG_SETTER(EnableChangefeedInitialScan)
FEATURE_FLAG_SETTER(EnableKqpImmediateEffects)
+ FEATURE_FLAG_SETTER(EnableDataShardGenericReadSets)
TDerived& SetEnableMvcc(std::optional<bool> value) {
if (value) {
diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
index e0c1245254e..585109ef45e 100644
--- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
@@ -97,7 +97,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
dataTx->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion);
if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) {
- auto result = KqpRunTransaction(ctx, op->GetTxId(), dataTx->GetKqpTasks(), tasksRunner);
+ auto result = KqpRunTransaction(ctx, op->GetTxId(), kqpTx, tasksRunner);
Y_VERIFY_S(!dataTx->GetKqpComputeCtx().HadInconsistentReads(),
"Unexpected inconsistent reads in operation " << *op << " when preparing persistent channels");
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 6085dd59376..c0188834c50 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -87,11 +87,11 @@ NUdf::EFetchStatus FetchAllOutput(NDq::IDqOutputChannel* channel, NDqProto::TDat
}
NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId,
- const TInputOpData::TInReadSets* inReadSets, const google::protobuf::RepeatedPtrField<NDqProto::TDqTask>& tasks,
+ const TInputOpData::TInReadSets* inReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx,
NKqp::TKqpTasksRunner& tasksRunner, bool applyEffects)
{
THashMap<ui64, std::pair<ui64, ui32>> inputChannelsMap; // channelId -> (taskId, input index)
- for (auto& task : tasks) {
+ for (auto& task : kqpTx.GetTasks()) {
for (ui32 i = 0; i < task.InputsSize(); ++i) {
auto& input = task.GetInputs(i);
for (auto& channel : input.GetChannels()) {
@@ -114,7 +114,20 @@ NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId,
for (auto& data : dataList) {
NKikimrTxDataShard::TKqpReadset kqpReadset;
- Y_PROTOBUF_SUPPRESS_NODISCARD kqpReadset.ParseFromString(data.Body);
+ if (kqpTx.GetUseGenericReadSets()) {
+ NKikimrTx::TReadSetData genericData;
+ bool ok = genericData.ParseFromString(data.Body);
+ Y_VERIFY(ok, "Failed to parse generic readset data from %" PRIu64 " to %" PRIu64 " origin %" PRIu64,
+ source, target, data.Origin);
+
+ if (genericData.HasData()) {
+ ok = genericData.GetData().UnpackTo(&kqpReadset);
+ Y_VERIFY(ok, "Failed to parse kqp readset data from %" PRIu64 " to %" PRIu64 " origin %" PRIu64,
+ source, target, data.Origin);
+ }
+ } else {
+ Y_PROTOBUF_SUPPRESS_NODISCARD kqpReadset.ParseFromString(data.Body);
+ }
for (int outputId = 0; outputId < kqpReadset.GetOutputs().size(); ++outputId) {
auto* channelData = kqpReadset.MutableOutputs()->Mutable(outputId);
@@ -151,7 +164,7 @@ NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId,
MKQL_ENSURE_S(runStatus == NDq::ERunStatus::PendingInput);
hasInputChanges = false;
- for (auto& task : tasks) {
+ for (auto& task : kqpTx.GetTasks()) {
for (ui32 i = 0; i < task.OutputsSize(); ++i) {
for (auto& channel : task.GetOutputs(i).GetChannels()) {
if (auto* inputInfo = inputChannelsMap.FindPtr(channel.GetId())) {
@@ -476,17 +489,17 @@ void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLoc
}
NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId,
- const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner)
+ const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner)
{
- return RunKqpTransactionInternal(ctx, txId, /* inReadSets */ nullptr, tasks, tasksRunner, /* applyEffects */ false);
+ return RunKqpTransactionInternal(ctx, txId, /* inReadSets */ nullptr, kqpTx, tasksRunner, /* applyEffects */ false);
}
THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const TActorContext& ctx,
ui64 origin, ui64 txId, const TInputOpData::TInReadSets* inReadSets,
- const google::protobuf::RepeatedPtrField<NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner,
+ const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner,
const NMiniKQL::TKqpDatashardComputeContext& computeCtx)
{
- auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, tasks, tasksRunner, /* applyEffects */ true);
+ auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, kqpTx, tasksRunner, /* applyEffects */ true);
if (computeCtx.HadInconsistentReads()) {
return nullptr;
@@ -501,7 +514,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA,
origin, txId, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
- for (auto& task : tasks) {
+ for (auto& task : kqpTx.GetTasks()) {
auto& taskRunner = tasksRunner.GetTaskRunner(task.GetId());
for (ui32 i = 0; i < task.OutputsSize(); ++i) {
@@ -576,6 +589,9 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT
}
}
+ NKikimrTx::TReadSetData::EDecision decision = NKikimrTx::TReadSetData::DECISION_COMMIT;
+ TMap<std::pair<ui64, ui64>, NKikimrTx::TReadSetData> genericData;
+
if (kqpTx.HasLocks() && NeedValidateLocks(kqpTx.GetLocks().GetOp())) {
bool sendLocks = SendLocks(kqpTx.GetLocks(), tabletId);
YQL_ENSURE(sendLocks == !kqpTx.GetLocks().GetLocks().empty());
@@ -589,7 +605,11 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT
for (auto& lock : brokenLocks) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Found broken lock: " << lock.ShortDebugString());
- validateLocksResult.AddBrokenLocks()->Swap(&lock);
+ if (kqpTx.GetUseGenericReadSets()) {
+ decision = NKikimrTx::TReadSetData::DECISION_ABORT;
+ } else {
+ validateLocksResult.AddBrokenLocks()->Swap(&lock);
+ }
}
for (auto& dstTabletId : kqpTx.GetLocks().GetReceivingShards()) {
@@ -601,16 +621,39 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT
<< tabletId << " to " << dstTabletId << ", locks: " << validateLocksResult.ShortDebugString());
auto key = std::make_pair(tabletId, dstTabletId);
- readsetData[key].MutableValidateLocksResult()->CopyFrom(validateLocksResult);
+ if (kqpTx.GetUseGenericReadSets()) {
+ genericData[key].SetDecision(decision);
+ } else {
+ readsetData[key].MutableValidateLocksResult()->CopyFrom(validateLocksResult);
+ }
}
}
}
- for (auto& [key, data] : readsetData) {
- TString bodyStr;
- Y_PROTOBUF_SUPPRESS_NODISCARD data.SerializeToString(&bodyStr);
+ if (kqpTx.GetUseGenericReadSets()) {
+ for (const auto& [key, data] : readsetData) {
+ bool ok = genericData[key].MutableData()->PackFrom(data);
+ Y_VERIFY(ok, "Failed to pack readset data from %" PRIu64 " to %" PRIu64, key.first, key.second);
+ }
+
+ for (auto& [key, data] : genericData) {
+ if (!data.HasDecision()) {
+ data.SetDecision(decision);
+ }
- outReadSets[key] = bodyStr;
+ TString bodyStr;
+ bool ok = data.SerializeToString(&bodyStr);
+ Y_VERIFY(ok, "Failed to serialize readset from %" PRIu64 " to %" PRIu64, key.first, key.second);
+
+ outReadSets[key] = std::move(bodyStr);
+ }
+ } else {
+ for (auto& [key, data] : readsetData) {
+ TString bodyStr;
+ Y_PROTOBUF_SUPPRESS_NODISCARD data.SerializeToString(&bodyStr);
+
+ outReadSets[key] = bodyStr;
+ }
}
}
@@ -645,22 +688,41 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks)
for (auto& readSet : tx->InReadSets()) {
for (auto& data : readSet.second) {
- NKikimrTxDataShard::TKqpReadset kqpReadset;
- Y_PROTOBUF_SUPPRESS_NODISCARD kqpReadset.ParseFromString(data.Body);
+ if (kqpTx.GetUseGenericReadSets()) {
+ NKikimrTx::TReadSetData genericData;
+ bool ok = genericData.ParseFromString(data.Body);
+ Y_VERIFY(ok, "Failed to parse generic readset from %" PRIu64 " to %" PRIu64 " origin %" PRIu64,
+ readSet.first.first, readSet.first.second, data.Origin);
- if (kqpReadset.HasValidateLocksResult()) {
- auto& validateResult = kqpReadset.GetValidateLocksResult();
- if (!validateResult.GetSuccess()) {
+ if (genericData.GetDecision() != NKikimrTx::TReadSetData::DECISION_COMMIT) {
tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
NKikimrTxDataShard::TX_KIND_DATA,
origin,
tx->GetTxId(),
NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
- tx->Result()->Record.MutableTxLocks()->CopyFrom(validateResult.GetBrokenLocks());
+ // Note: we don't know details on what failed at that shard
return false;
}
+ } else {
+ NKikimrTxDataShard::TKqpReadset kqpReadset;
+ Y_PROTOBUF_SUPPRESS_NODISCARD kqpReadset.ParseFromString(data.Body);
+
+ if (kqpReadset.HasValidateLocksResult()) {
+ auto& validateResult = kqpReadset.GetValidateLocksResult();
+ if (!validateResult.GetSuccess()) {
+ tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
+ NKikimrTxDataShard::TX_KIND_DATA,
+ origin,
+ tx->GetTxId(),
+ NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
+
+ tx->Result()->Record.MutableTxLocks()->CopyFrom(validateResult.GetBrokenLocks());
+
+ return false;
+ }
+ }
}
}
}
diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h
index 370266cba60..52086ea0f70 100644
--- a/ydb/core/tx/datashard/datashard_kqp.h
+++ b/ydb/core/tx/datashard/datashard_kqp.h
@@ -21,11 +21,11 @@ void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo,
void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLocks& sysLocks, TEngineBay& engineBay);
NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId,
- const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner);
+ const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner);
THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const TActorContext& ctx,
ui64 origin, ui64 txId, const TInputOpData::TInReadSets* inReadSets,
- const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, NKqp::TKqpTasksRunner& tasksRunner,
+ const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner,
const NMiniKQL::TKqpDatashardComputeContext& computeCtx);
void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx,
diff --git a/ydb/core/tx/datashard/datashard_ut_rs.cpp b/ydb/core/tx/datashard/datashard_ut_rs.cpp
index 6442648fdce..b71bea89218 100644
--- a/ydb/core/tx/datashard/datashard_ut_rs.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_rs.cpp
@@ -1,5 +1,6 @@
#include "defs.h"
#include "datashard_ut_common.h"
+#include "datashard_ut_common_kqp.h"
#include <ydb/core/testlib/test_client.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
@@ -10,6 +11,7 @@
namespace NKikimr {
+using namespace NKikimr::NDataShard::NKqpHelpers;
using namespace NSchemeShard;
using namespace Tests;
@@ -324,6 +326,124 @@ Y_UNIT_TEST_SUITE(TDataShardRSTest) {
runtime.Register(CreateTabletKiller(shard1));
WaitTabletBecomesOffline(server, shard2);
}
+
+ Y_UNIT_TEST(TestGenericReadSetDecisionCommit) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableDataShardGenericReadSets(true);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)");
+
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
+
+ size_t readSets = 0;
+ auto observeReadSets = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ NKikimrTx::TReadSetData genericData;
+ Y_VERIFY(genericData.ParseFromString(msg->Record.GetReadSet()));
+ Cerr << "... generic readset: " << genericData.DebugString() << Endl;
+ UNIT_ASSERT(genericData.HasDecision());
+ UNIT_ASSERT(genericData.GetDecision() == NKikimrTx::TReadSetData::DECISION_COMMIT);
+ UNIT_ASSERT(!genericData.HasData());
+ UNIT_ASSERT(genericData.unknown_fields().empty());
+ ++readSets;
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserver = runtime.SetObserverFunc(observeReadSets);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleCommit(runtime, sessionId, txId, R"(
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)
+ )"),
+ "<empty>");
+ UNIT_ASSERT(readSets > 0);
+ }
+
+ Y_UNIT_TEST(TestGenericReadSetDecisionAbort) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableDataShardGenericReadSets(true);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)");
+
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)");
+
+ size_t readSets = 0;
+ auto observeReadSets = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ NKikimrTx::TReadSetData genericData;
+ Y_VERIFY(genericData.ParseFromString(msg->Record.GetReadSet()));
+ Cerr << "... generic readset: " << genericData.DebugString() << Endl;
+ UNIT_ASSERT(genericData.HasDecision());
+ UNIT_ASSERT(genericData.GetDecision() == NKikimrTx::TReadSetData::DECISION_ABORT);
+ UNIT_ASSERT(!genericData.HasData());
+ UNIT_ASSERT(genericData.unknown_fields().empty());
+ ++readSets;
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserver = runtime.SetObserverFunc(observeReadSets);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleCommit(runtime, sessionId, txId, R"(
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (3, 3)
+ )"),
+ "ERROR: ABORTED");
+ UNIT_ASSERT(readSets > 0);
+ }
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 304d94b17f7..d211051c9ab 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -204,7 +204,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx();
auto result = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(),
- op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), dataTx->GetKqpTasks(), tasksRunner, computeCtx);
+ op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), kqpTx, tasksRunner, computeCtx);
if (!result && computeCtx.HadInconsistentReads()) {
LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId