From fef1ffbaef82af08b6201f41c2ed15723a4fe1cf Mon Sep 17 00:00:00 2001
From: spuchin <spuchin@ydb.tech>
Date: Thu, 3 Aug 2023 13:24:08 +0300
Subject: Fix incorrect QueryResultIndex handling in executer result proxy.
 (KIKIMR-18919)

---
 ydb/core/kqp/executer_actor/kqp_executer_impl.cpp |  7 ++++++-
 ydb/core/kqp/executer_actor/kqp_executer_impl.h   |  6 +++---
 ydb/core/kqp/query_data/kqp_query_data.h          |  8 ++++----
 ydb/core/kqp/ut/service/kqp_query_service_ut.cpp  | 17 +++++++++++++++++
 ydb/core/protos/kqp_physical.proto                |  4 ++--
 5 files changed, 32 insertions(+), 10 deletions(-)

diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 5be1366333..24d05ae129 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -23,8 +23,13 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPt
         const auto& result = tx->GetResults(i);
         const auto& resultMeta = tx->GetTxResultsMeta()[i];
 
+        TMaybe<ui32> queryResultIndex;
+        if (result.HasQueryResultIndex()) {
+            queryResultIndex = result.GetQueryResultIndex();
+        }
+
         TxResults.emplace_back(result.GetIsStream(), resultMeta.MkqlItemType, &resultMeta.ColumnOrder,
-            result.GetQueryResultIndex());
+            queryResultIndex);
     }
 }
 
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 31092da6b8..cfdf9c94cc 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -780,7 +780,7 @@ protected:
             input.ConnectionInfo = NYql::NDq::TSourceInput{};
 
             // allocating source settings
-            
+
             input.Meta.SourceSettings = TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpReadRangesSourceSettings>();
             NKikimrTxDataShard::TKqpReadRangesSourceSettings* settings = input.Meta.SourceSettings;
             FillTableMeta(stageInfo, settings->MutableTable());
@@ -1026,9 +1026,9 @@ protected:
         const auto& txResult = ResponseEv->TxResults[channel.DstInputIndex];
 
         IActor* proxy;
-        if (txResult.IsStream) {
+        if (txResult.IsStream && txResult.QueryResultIndex.Defined()) {
             proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType,
-                txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats, this->SelfId());
+                txResult.ColumnOrder, *txResult.QueryResultIndex, Target, Stats, this->SelfId());
         } else {
             proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats, this->SelfId(),
                 channel.DstInputIndex, ResponseEv.get());
diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h
index aa1f8a2337..21c2f61608 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.h
+++ b/ydb/core/kqp/query_data/kqp_query_data.h
@@ -79,14 +79,14 @@ struct TKqpExecuterTxResult {
     bool IsStream = true;
     NKikimr::NMiniKQL::TType* MkqlItemType;
     const TVector<ui32>* ColumnOrder = nullptr;
-    ui32 QueryResultIndex = 0;
+    TMaybe<ui32> QueryResultIndex = 0;
     NKikimr::NMiniKQL::TUnboxedValueBatch Rows;
 
     explicit TKqpExecuterTxResult(
         bool isStream,
         NKikimr::NMiniKQL::TType* mkqlItemType,
         const TVector<ui32>* сolumnOrder,
-        ui32 queryResultIndex)
+        const TMaybe<ui32>& queryResultIndex)
         : IsStream(isStream)
         , MkqlItemType(mkqlItemType)
         , ColumnOrder(сolumnOrder)
@@ -199,7 +199,7 @@ private:
     THashMap<ui32, TVector<TKqpExecuterTxResult>> TxResults;
     TVector<TVector<TKqpPhyTxHolder::TConstPtr>> TxHolders;
     TTxAllocatorState::TPtr AllocState;
-    mutable TPartitionedParamMap PartitionedParams; 
+    mutable TPartitionedParamMap PartitionedParams;
 
 public:
     using TPtr = std::shared_ptr<TQueryData>;
@@ -268,7 +268,7 @@ public:
                 std::tie(type, value) = *param;
                 return true;
             }
-            
+
 
             return false;
         };
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index c46f698bf9..6bd42cf4cd 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -392,6 +392,23 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
         CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0)));
     }
 
+    Y_UNIT_TEST(MaterializeTxResults) {
+        auto kikimr = DefaultKikimrRunner();
+        auto db = kikimr.GetQueryClient();
+
+        auto result = db.ExecuteQuery(R"(
+            DELETE FROM KeyValue;
+        )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+        UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+        result = db.ExecuteQuery(R"(
+            SELECT * FROM KeyValue;
+        )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+        UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+        CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
+    }
+
     NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver, i32 tries = -1) {
         NYdb::NOperation::TOperationClient client(ydbDriver);
         NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op;
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 5d8f86f453..936680f952 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -259,7 +259,7 @@ message TKqpPhyCnStreamLookup {
 message TKqpPhyCnSequencer {
     TKqpPhyTableId Table = 1;
     repeated string Columns = 2;
-    repeated string AutoIncrementColumns = 3;  
+    repeated string AutoIncrementColumns = 3;
     bytes InputType = 4;
     bytes OutputType = 5;
 }
@@ -339,7 +339,7 @@ message TKqpPhyResult {
     NKikimrMiniKQL.TType ItemType = 2;
     bool IsStream = 3;
     repeated string ColumnHints = 4;
-    uint32 QueryResultIndex = 5;
+    optional uint32 QueryResultIndex = 5;
 }
 
 message TKqpSchemeOperation {
-- 
cgit v1.2.3