aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2023-08-03 13:24:08 +0300
committerspuchin <spuchin@ydb.tech>2023-08-03 13:24:08 +0300
commitfef1ffbaef82af08b6201f41c2ed15723a4fe1cf (patch)
treede206b5f4d0970d2a8392249a799f48f8050f8c4
parenta31dbe76e19c44ca466f2fb326fbfe4f3f4a95b3 (diff)
downloadydb-fef1ffbaef82af08b6201f41c2ed15723a4fe1cf.tar.gz
Fix incorrect QueryResultIndex handling in executer result proxy. (KIKIMR-18919)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp7
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h6
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.h8
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp17
-rw-r--r--ydb/core/protos/kqp_physical.proto4
5 files changed, 32 insertions, 10 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 5be1366333..24d05ae129 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -23,8 +23,13 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPt
const auto& result = tx->GetResults(i);
const auto& resultMeta = tx->GetTxResultsMeta()[i];
+ TMaybe<ui32> queryResultIndex;
+ if (result.HasQueryResultIndex()) {
+ queryResultIndex = result.GetQueryResultIndex();
+ }
+
TxResults.emplace_back(result.GetIsStream(), resultMeta.MkqlItemType, &resultMeta.ColumnOrder,
- result.GetQueryResultIndex());
+ queryResultIndex);
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 31092da6b8..cfdf9c94cc 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -780,7 +780,7 @@ protected:
input.ConnectionInfo = NYql::NDq::TSourceInput{};
// allocating source settings
-
+
input.Meta.SourceSettings = TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpReadRangesSourceSettings>();
NKikimrTxDataShard::TKqpReadRangesSourceSettings* settings = input.Meta.SourceSettings;
FillTableMeta(stageInfo, settings->MutableTable());
@@ -1026,9 +1026,9 @@ protected:
const auto& txResult = ResponseEv->TxResults[channel.DstInputIndex];
IActor* proxy;
- if (txResult.IsStream) {
+ if (txResult.IsStream && txResult.QueryResultIndex.Defined()) {
proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType,
- txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats, this->SelfId());
+ txResult.ColumnOrder, *txResult.QueryResultIndex, Target, Stats, this->SelfId());
} else {
proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats, this->SelfId(),
channel.DstInputIndex, ResponseEv.get());
diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h
index aa1f8a2337..21c2f61608 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.h
+++ b/ydb/core/kqp/query_data/kqp_query_data.h
@@ -79,14 +79,14 @@ struct TKqpExecuterTxResult {
bool IsStream = true;
NKikimr::NMiniKQL::TType* MkqlItemType;
const TVector<ui32>* ColumnOrder = nullptr;
- ui32 QueryResultIndex = 0;
+ TMaybe<ui32> QueryResultIndex = 0;
NKikimr::NMiniKQL::TUnboxedValueBatch Rows;
explicit TKqpExecuterTxResult(
bool isStream,
NKikimr::NMiniKQL::TType* mkqlItemType,
const TVector<ui32>* сolumnOrder,
- ui32 queryResultIndex)
+ const TMaybe<ui32>& queryResultIndex)
: IsStream(isStream)
, MkqlItemType(mkqlItemType)
, ColumnOrder(сolumnOrder)
@@ -199,7 +199,7 @@ private:
THashMap<ui32, TVector<TKqpExecuterTxResult>> TxResults;
TVector<TVector<TKqpPhyTxHolder::TConstPtr>> TxHolders;
TTxAllocatorState::TPtr AllocState;
- mutable TPartitionedParamMap PartitionedParams;
+ mutable TPartitionedParamMap PartitionedParams;
public:
using TPtr = std::shared_ptr<TQueryData>;
@@ -268,7 +268,7 @@ public:
std::tie(type, value) = *param;
return true;
}
-
+
return false;
};
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index c46f698bf9..6bd42cf4cd 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -392,6 +392,23 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0)));
}
+ Y_UNIT_TEST(MaterializeTxResults) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto result = db.ExecuteQuery(R"(
+ DELETE FROM KeyValue;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ result = db.ExecuteQuery(R"(
+ SELECT * FROM KeyValue;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver, i32 tries = -1) {
NYdb::NOperation::TOperationClient client(ydbDriver);
NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op;
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 5d8f86f453..936680f952 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -259,7 +259,7 @@ message TKqpPhyCnStreamLookup {
message TKqpPhyCnSequencer {
TKqpPhyTableId Table = 1;
repeated string Columns = 2;
- repeated string AutoIncrementColumns = 3;
+ repeated string AutoIncrementColumns = 3;
bytes InputType = 4;
bytes OutputType = 5;
}
@@ -339,7 +339,7 @@ message TKqpPhyResult {
NKikimrMiniKQL.TType ItemType = 2;
bool IsStream = 3;
repeated string ColumnHints = 4;
- uint32 QueryResultIndex = 5;
+ optional uint32 QueryResultIndex = 5;
}
message TKqpSchemeOperation {