diff options
author | ssmike <ssmike@ydb.tech> | 2023-02-20 12:10:40 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-02-20 12:10:40 +0300 |
commit | 16cad265c4a2eea02a9243db010b89dab1890a46 (patch) | |
tree | 42692d1ca733dcd0e5fa73aed257acbb16a89ebd | |
parent | ebbaae7d3565a888c08e2ad0313f930a43018893 (diff) | |
download | ydb-16cad265c4a2eea02a9243db010b89dab1890a46.tar.gz |
fix immediate tx condition
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 7267944eb84..e8258c7c5ed 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -308,6 +308,10 @@ private: hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare); hFunc(TEvKqp::TEvAbortExecution, HandlePrepare); hFunc(TEvents::TEvWakeup, HandlePrepare); + hFunc(TEvents::TEvUndelivered, HandleUndelivered); + hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected); + hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse); + IgnoreFunc(TEvInterconnect::TEvNodeConnected); default: { CancelProposal(0); UnexpectedEvent("PrepareState", ev->GetTypeRewrite()); @@ -1302,8 +1306,9 @@ private: break; } - case NKqpProto::TKqpPhyConnection::kMap: - case NKqpProto::TKqpPhyConnection::kStreamLookup: { + case NKqpProto::TKqpPhyConnection::kStreamLookup: + HasStreamLookup = true; + case NKqpProto::TKqpPhyConnection::kMap: { partitionsCount = originStageInfo.Tasks.size(); break; } @@ -1447,6 +1452,7 @@ private: LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); RequestControls.Reqister(TlsActivationContext->AsActorContext()); + size_t readActors = 0; ReadOnlyTx = !Request.TopicOperations.HasOperations(); for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) { auto& tx = Request.Transactions[txIdx]; @@ -1481,7 +1487,7 @@ private: if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - BuildScanTasksFromSource(stageInfo, Request.Snapshot, LockTxId); + readActors += BuildScanTasksFromSource(stageInfo, Request.Snapshot, LockTxId); break; default: YQL_ENSURE(false, "unknown source type"); @@ -1672,7 +1678,7 @@ private: auto datashardTxs = BuildDatashardTxs(datashardTasks, topicTxs); // Single-shard transactions are always immediate - ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize()) <= 1; + ImmediateTx = (datashardTxs.size() + Request.TopicOperations.GetSize() + readActors) <= 1 && !HasStreamLookup; if (ImmediateTx) { // Transaction cannot be both immediate and volatile @@ -2290,6 +2296,8 @@ private: private: bool StreamResult = false; + bool HasStreamLookup = false; + NTxProxy::TRequestControls RequestControls; ui64 TxCoordinator = 0; THashMap<ui64, TShardState> ShardStates; |