aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-02-20 12:10:40 +0300
committerssmike <ssmike@ydb.tech>2023-02-20 12:10:40 +0300
commit16cad265c4a2eea02a9243db010b89dab1890a46 (patch)
tree42692d1ca733dcd0e5fa73aed257acbb16a89ebd
parentebbaae7d3565a888c08e2ad0313f930a43018893 (diff)
downloadydb-16cad265c4a2eea02a9243db010b89dab1890a46.tar.gz
fix immediate tx condition
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp16
1 files changed, 12 insertions, 4 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 7267944eb84..e8258c7c5ed 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -308,6 +308,10 @@ private:
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
hFunc(TEvKqp::TEvAbortExecution, HandlePrepare);
hFunc(TEvents::TEvWakeup, HandlePrepare);
+ hFunc(TEvents::TEvUndelivered, HandleUndelivered);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
+ hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
+ IgnoreFunc(TEvInterconnect::TEvNodeConnected);
default: {
CancelProposal(0);
UnexpectedEvent("PrepareState", ev->GetTypeRewrite());
@@ -1302,8 +1306,9 @@ private:
break;
}
- case NKqpProto::TKqpPhyConnection::kMap:
- case NKqpProto::TKqpPhyConnection::kStreamLookup: {
+ case NKqpProto::TKqpPhyConnection::kStreamLookup:
+ HasStreamLookup = true;
+ case NKqpProto::TKqpPhyConnection::kMap: {
partitionsCount = originStageInfo.Tasks.size();
break;
}
@@ -1447,6 +1452,7 @@ private:
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
RequestControls.Reqister(TlsActivationContext->AsActorContext());
+ size_t readActors = 0;
ReadOnlyTx = !Request.TopicOperations.HasOperations();
for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) {
auto& tx = Request.Transactions[txIdx];
@@ -1481,7 +1487,7 @@ private:
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
- BuildScanTasksFromSource(stageInfo, Request.Snapshot, LockTxId);
+ readActors += BuildScanTasksFromSource(stageInfo, Request.Snapshot, LockTxId);
break;
default:
YQL_ENSURE(false, "unknown source type");
@@ -1672,7 +1678,7 @@ private:
auto datashardTxs = BuildDatashardTxs(datashardTasks, topicTxs);
// Single-shard transactions are always immediate
- ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize()) <= 1;
+ ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize() + readActors) <= 1 && !HasStreamLookup;
if (ImmediateTx) {
// Transaction cannot be both immediate and volatile
@@ -2290,6 +2296,8 @@ private:
private:
bool StreamResult = false;
+ bool HasStreamLookup = false;
+
NTxProxy::TRequestControls RequestControls;
ui64 TxCoordinator = 0;
THashMap<ui64, TShardState> ShardStates;