diff options
author | spuchin <spuchin@ydb.tech> | 2023-08-03 13:24:08 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2023-08-03 13:24:08 +0300 |
commit | fef1ffbaef82af08b6201f41c2ed15723a4fe1cf (patch) | |
tree | de206b5f4d0970d2a8392249a799f48f8050f8c4 | |
parent | a31dbe76e19c44ca466f2fb326fbfe4f3f4a95b3 (diff) | |
download | ydb-fef1ffbaef82af08b6201f41c2ed15723a4fe1cf.tar.gz |
Fix incorrect QueryResultIndex handling in executer result proxy. (KIKIMR-18919)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_query_data.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_query_service_ut.cpp | 17 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 4 |
5 files changed, 32 insertions, 10 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 5be1366333..24d05ae129 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -23,8 +23,13 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPt const auto& result = tx->GetResults(i); const auto& resultMeta = tx->GetTxResultsMeta()[i]; + TMaybe<ui32> queryResultIndex; + if (result.HasQueryResultIndex()) { + queryResultIndex = result.GetQueryResultIndex(); + } + TxResults.emplace_back(result.GetIsStream(), resultMeta.MkqlItemType, &resultMeta.ColumnOrder, - result.GetQueryResultIndex()); + queryResultIndex); } } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 31092da6b8..cfdf9c94cc 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -780,7 +780,7 @@ protected: input.ConnectionInfo = NYql::NDq::TSourceInput{}; // allocating source settings - + input.Meta.SourceSettings = TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(); NKikimrTxDataShard::TKqpReadRangesSourceSettings* settings = input.Meta.SourceSettings; FillTableMeta(stageInfo, settings->MutableTable()); @@ -1026,9 +1026,9 @@ protected: const auto& txResult = ResponseEv->TxResults[channel.DstInputIndex]; IActor* proxy; - if (txResult.IsStream) { + if (txResult.IsStream && txResult.QueryResultIndex.Defined()) { proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType, - txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats, this->SelfId()); + txResult.ColumnOrder, *txResult.QueryResultIndex, Target, Stats, this->SelfId()); } else { proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats, this->SelfId(), channel.DstInputIndex, ResponseEv.get()); diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h index aa1f8a2337..21c2f61608 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.h +++ b/ydb/core/kqp/query_data/kqp_query_data.h @@ -79,14 +79,14 @@ struct TKqpExecuterTxResult { bool IsStream = true; NKikimr::NMiniKQL::TType* MkqlItemType; const TVector<ui32>* ColumnOrder = nullptr; - ui32 QueryResultIndex = 0; + TMaybe<ui32> QueryResultIndex = 0; NKikimr::NMiniKQL::TUnboxedValueBatch Rows; explicit TKqpExecuterTxResult( bool isStream, NKikimr::NMiniKQL::TType* mkqlItemType, const TVector<ui32>* сolumnOrder, - ui32 queryResultIndex) + const TMaybe<ui32>& queryResultIndex) : IsStream(isStream) , MkqlItemType(mkqlItemType) , ColumnOrder(сolumnOrder) @@ -199,7 +199,7 @@ private: THashMap<ui32, TVector<TKqpExecuterTxResult>> TxResults; TVector<TVector<TKqpPhyTxHolder::TConstPtr>> TxHolders; TTxAllocatorState::TPtr AllocState; - mutable TPartitionedParamMap PartitionedParams; + mutable TPartitionedParamMap PartitionedParams; public: using TPtr = std::shared_ptr<TQueryData>; @@ -268,7 +268,7 @@ public: std::tie(type, value) = *param; return true; } - + return false; }; diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index c46f698bf9..6bd42cf4cd 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -392,6 +392,23 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0))); } + Y_UNIT_TEST(MaterializeTxResults) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto result = db.ExecuteQuery(R"( + DELETE FROM KeyValue; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM KeyValue; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + } + NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver, i32 tries = -1) { NYdb::NOperation::TOperationClient client(ydbDriver); NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op; diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 5d8f86f453..936680f952 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -259,7 +259,7 @@ message TKqpPhyCnStreamLookup { message TKqpPhyCnSequencer { TKqpPhyTableId Table = 1; repeated string Columns = 2; - repeated string AutoIncrementColumns = 3; + repeated string AutoIncrementColumns = 3; bytes InputType = 4; bytes OutputType = 5; } @@ -339,7 +339,7 @@ message TKqpPhyResult { NKikimrMiniKQL.TType ItemType = 2; bool IsStream = 3; repeated string ColumnHints = 4; - uint32 QueryResultIndex = 5; + optional uint32 QueryResultIndex = 5; } message TKqpSchemeOperation { |