aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-12-30 16:54:32 +0300
committergvit <gvit@ydb.tech>2022-12-30 16:54:32 +0300
commit58ff0f051f57461c1377ac11c38b0e1804fe9b4f (patch)
tree1365e95723b1f3e74bf3bb8949dc00dae5137b39
parent5de12f5f8442209d2b18e94feb307cf054e5b542 (diff)
downloadydb-58ff0f051f57461c1377ac11c38b0e1804fe9b4f.tar.gz
don't stage useless mkql data in session actor
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h13
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp72
-rw-r--r--ydb/core/kqp/executer_actor/kqp_result_channel.cpp18
-rw-r--r--ydb/core/kqp/executer_actor/kqp_result_channel.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp7
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp9
-rw-r--r--ydb/core/kqp/gateway/kqp_query_data.cpp64
-rw-r--r--ydb/core/kqp/gateway/kqp_query_data.h27
-rw-r--r--ydb/core/kqp/runtime/kqp_transport.cpp59
-rw-r--r--ydb/core/kqp/runtime/kqp_transport.h2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp49
-rw-r--r--ydb/library/mkql_proto/mkql_proto.cpp30
-rw-r--r--ydb/library/mkql_proto/mkql_proto.h4
13 files changed, 154 insertions, 204 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 95c6b02d302..0cab182400d 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -12,15 +12,6 @@
namespace NKikimr {
namespace NKqp {
-struct TKqpExecuterTxResult {
- NKikimrMiniKQL::TType ItemType;
- NKikimr::NMiniKQL::TType* MkqlItemType;
- TVector<ui32> ColumnOrder;
- std::optional<NKikimrMiniKQL::TType> ResultItemType;
- NKikimr::NMiniKQL::TUnboxedValueVector Rows;
- bool IsStream = true;
-};
-
struct TEvKqpExecuter {
struct TEvTxRequest : public TEventPB<TEvTxRequest, NKikimrKqp::TEvExecuterTxRequest,
TKqpExecuterEvents::EvTxRequest> {};
@@ -42,10 +33,8 @@ struct TEvKqpExecuter {
~TEvTxResponse();
- TTypedUnboxedValueVector GetUnboxedValueResults();
- TVector<NKikimrMiniKQL::TResult>& GetMkqlResults();
+ TVector<TKqpExecuterTxResult>& GetTxResults() { return TxResults; }
void InitTxResult(const NKqpProto::TKqpPhyTx& tx);
- TTypedUnboxedValueVector Finalize();
void TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnboxedValueVector& rows);
void TakeResult(ui32 idx, const NYql::NDqProto::TData& rows);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 55b63bf8976..7e58c735738 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -22,8 +22,7 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const NKqpProto::TKqpPhyTx& tx)
for (const auto& txResult : tx.GetResults()) {
auto& result = TxResults[i++];
result.IsStream = txResult.GetIsStream();
- result.ItemType.CopyFrom(txResult.GetItemType());
- result.MkqlItemType = ImportTypeFromProto(result.ItemType, AllocState->TypeEnv);
+ result.MkqlItemType = ImportTypeFromProto(txResult.GetItemType(), AllocState->TypeEnv);
if (txResult.ColumnHintsSize() > 0) {
result.ColumnOrder.reserve(txResult.GetColumnHints().size());
@@ -39,12 +38,7 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const NKqpProto::TKqpPhyTx& tx)
auto it = memberIndices.find(name);
YQL_ENSURE(it != memberIndices.end(), "undetermined column name: " << name);
result.ColumnOrder.push_back(it->second);
-
- auto* newMember = resultItemType.MutableStruct()->AddMember();
- newMember->SetName(name);
- ExportTypeToProto(structType->GetMemberType(it->second), *newMember->MutableType());
}
- result.ResultItemType.emplace(std::move(resultItemType));
}
}
}
@@ -81,70 +75,6 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnb
serializer.Deserialize(buffer, txResult.MkqlItemType, txResult.Rows);
}
-TTypedUnboxedValueVector TEvKqpExecuter::TEvTxResponse::GetUnboxedValueResults() {
- return Finalize();
-}
-
-TVector<NKikimrMiniKQL::TResult>& TEvKqpExecuter::TEvTxResponse::GetMkqlResults() {
- if (MkqlResults_.empty()) {
- Finalize();
- }
- return MkqlResults_;
-}
-
-TTypedUnboxedValueVector TEvKqpExecuter::TEvTxResponse::Finalize() {
- auto g = AllocState->TypeEnv.BindAllocator();
- MkqlResults_.resize(TxResults.size());
- ui32 idx = 0;
- for (auto& result : TxResults) {
- auto& mkqlResult = MkqlResults_[idx++];
- if (result.IsStream) {
- mkqlResult.MutableType()->SetKind(NKikimrMiniKQL::List);
- if (result.ResultItemType) {
- mkqlResult.MutableType()->MutableList()->MutableItem()->CopyFrom(
- *result.ResultItemType);
- } else {
- mkqlResult.MutableType()->MutableList()->MutableItem()->CopyFrom(
- result.ItemType);
- }
-
- for(auto& row: result.Rows) {
- ExportValueToProto(
- result.MkqlItemType, row, *mkqlResult.MutableValue()->AddList(),
- &result.ColumnOrder);
- }
-
- } else {
- YQL_ENSURE(result.Rows.size() == 1, "Actual buffer size: " << result.Rows.size());
- mkqlResult.MutableType()->CopyFrom(result.ItemType);
- ExportValueToProto(result.MkqlItemType, result.Rows[0], *mkqlResult.MutableValue());
- }
- }
-
- TTypedUnboxedValueVector UnboxedResult_;
-
- for(auto& txResult: TxResults) {
- auto* type = txResult.MkqlItemType;
- if (txResult.IsStream) {
- auto* listOfItemType = NKikimr::NMiniKQL::TListType::Create(type, AllocState->TypeEnv);
- NUdf::TUnboxedValue value = AllocState->HolderFactory.VectorAsArray(txResult.Rows);
- if (txResult.ResultItemType) {
- auto* type = ImportTypeFromProto(*txResult.ResultItemType, AllocState->TypeEnv);
- auto* listType = NKikimr::NMiniKQL::TListType::Create(type, AllocState->TypeEnv);
- UnboxedResult_.push_back(std::make_pair(listType, value));
- } else {
- UnboxedResult_.push_back(std::make_pair(listOfItemType, value));
- }
- } else {
- UnboxedResult_.push_back(std::make_pair(type, txResult.Rows[0]));
- }
- }
-
- TxResults.crop(0);
- return UnboxedResult_;
-}
-
-
void PrepareKqpTaskParameters(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, const TTask& task,
NDqProto::TDqTask& dqTask, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory&)
{
diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
index d4e921f1392..3d83981ca7e 100644
--- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
@@ -119,11 +119,11 @@ private:
class TResultStreamChannelProxy : public TResultCommonChannelProxy {
public:
- TResultStreamChannelProxy(ui64 txId, ui64 channelId, const NKikimrMiniKQL::TType& itemType,
- const NKikimrMiniKQL::TType* resultItemType, TActorId target, TQueryExecutionStats* stats,
+ TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
+ const TVector<ui32>* columnOrder, TActorId target, TQueryExecutionStats* stats,
TActorId executer)
: TResultCommonChannelProxy(txId, channelId, stats, executer)
- , ResultItemType(resultItemType)
+ , ColumnOrder(columnOrder)
, ItemType(itemType)
, Target(target) {}
@@ -134,7 +134,7 @@ private:
auto& channelData = computeData.GetChannelData();
TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
- auto resultSet = protoBuilder.BuildYdbResultSet({channelData.GetData()}, ItemType, ResultItemType);
+ auto resultSet = protoBuilder.BuildYdbResultSet({channelData.GetData()}, ItemType, ColumnOrder);
auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
streamEv->Record.SetSeqNo(computeData.GetSeqNo());
@@ -147,8 +147,8 @@ private:
Send(Target, streamEv.Release());
}
private:
- const NKikimrMiniKQL::TType* ResultItemType;
- const NKikimrMiniKQL::TType& ItemType;
+ const TVector<ui32>* ColumnOrder;
+ NKikimr::NMiniKQL::TType* ItemType;
const NActors::TActorId Target;
};
@@ -183,15 +183,15 @@ private:
} // anonymous namespace end
-NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, const NKikimrMiniKQL::TType& itemType,
- const NKikimrMiniKQL::TType* resultItemType, TActorId target, TQueryExecutionStats* stats, TActorId executer)
+NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
+ const TVector<ui32>* columnOrder, TActorId target, TQueryExecutionStats* stats, TActorId executer)
{
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
"CreateResultStreamChannelProxy: TxId: " << txId <<
", channelId: " << channelId
);
- return new TResultStreamChannelProxy(txId, channelId, itemType, resultItemType, target, stats, executer);
+ return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, target, stats, executer);
}
NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId,
diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.h b/ydb/core/kqp/executer_actor/kqp_result_channel.h
index 907d163a46c..5cc8c54fb00 100644
--- a/ydb/core/kqp/executer_actor/kqp_result_channel.h
+++ b/ydb/core/kqp/executer_actor/kqp_result_channel.h
@@ -25,8 +25,8 @@ namespace NKikimr::NKqp {
struct TQueryExecutionStats;
struct TKqpExecuterTxResult;
-NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, const NKikimrMiniKQL::TType& itemType,
- const NKikimrMiniKQL::TType* resultItemType, NActors::TActorId target, TQueryExecutionStats* stats,
+NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
+ const TVector<ui32>* columnOrder, NActors::TActorId target, TQueryExecutionStats* stats,
NActors::TActorId executer);
NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats,
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 93382dae47f..2a5d0e102ef 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -733,11 +733,8 @@ public:
return ResultChannelProxies.begin()->second;
}
- auto resultType = ResponseEv->TxResults[0].ResultItemType ?
- &ResponseEv->TxResults[0].ResultItemType.value() : nullptr;
-
- proxy = CreateResultStreamChannelProxy(TxId, channel.Id, ResponseEv->TxResults[0].ItemType,
- resultType, Target, Stats.get(), SelfId());
+ proxy = CreateResultStreamChannelProxy(TxId, channel.Id, ResponseEv->TxResults[0].MkqlItemType,
+ &ResponseEv->TxResults[0].ColumnOrder, Target, Stats.get(), SelfId());
} else {
YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize());
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 351398b3df7..004b3fc7dd6 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -513,10 +513,13 @@ private:
result.ExecuterResult.Swap(response->MutableResult());
{
auto g = Parameters->TypeEnv().BindAllocator();
- auto unboxed = ev->Get()->GetUnboxedValueResults();
- Parameters->AddTxResults(std::move(unboxed));
+ auto& txResults = ev->Get()->GetTxResults();
+ result.Results.reserve(txResults.size());
+ for(auto& tx : txResults) {
+ result.Results.emplace_back(std::move(tx.GetMkql()));
+ }
+ Parameters->AddTxResults(std::move(txResults));
}
- result.Results = std::move(ev->Get()->GetMkqlResults());
Promise.SetValue(std::move(result));
this->PassAway();
}
diff --git a/ydb/core/kqp/gateway/kqp_query_data.cpp b/ydb/core/kqp/gateway/kqp_query_data.cpp
index 5af63049b25..f68c1545436 100644
--- a/ydb/core/kqp/gateway/kqp_query_data.cpp
+++ b/ydb/core/kqp/gateway/kqp_query_data.cpp
@@ -13,6 +13,53 @@ using namespace NKikimr::NMiniKQL;
using namespace NYql;
using namespace NYql::NUdf;
+TTypedUnboxedValue TKqpExecuterTxResult::GetUV(
+ const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::THolderFactory& factory)
+{
+ if (IsStream) {
+ auto* listOfItemType = NKikimr::NMiniKQL::TListType::Create(MkqlItemType, typeEnv);
+ NUdf::TUnboxedValue value = factory.VectorAsArray(Rows);
+ return {listOfItemType, value};
+ } else {
+ YQL_ENSURE(Rows.size() == 1, "Actual buffer size: " << Rows.size());
+ return {MkqlItemType, Rows[0]};
+ }
+}
+
+NKikimrMiniKQL::TResult* TKqpExecuterTxResult::GetMkql(google::protobuf::Arena* arena) {
+ NKikimrMiniKQL::TResult* mkqlResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(arena);
+ FillMkql(mkqlResult);
+ return mkqlResult;
+}
+
+NKikimrMiniKQL::TResult TKqpExecuterTxResult::GetMkql() {
+ NKikimrMiniKQL::TResult mkqlResult;
+ FillMkql(&mkqlResult);
+ return mkqlResult;
+}
+
+void TKqpExecuterTxResult::FillMkql(NKikimrMiniKQL::TResult* mkqlResult) {
+ if (IsStream) {
+ mkqlResult->MutableType()->SetKind(NKikimrMiniKQL::List);
+ ExportTypeToProto(
+ MkqlItemType,
+ *mkqlResult->MutableType()->MutableList()->MutableItem(),
+ &ColumnOrder);
+
+ for(auto& row: Rows) {
+ ExportValueToProto(
+ MkqlItemType, row, *mkqlResult->MutableValue()->AddList(),
+ &ColumnOrder);
+ }
+
+ } else {
+ YQL_ENSURE(Rows.size() == 1, "Actual buffer size: " << Rows.size());
+ ExportTypeToProto(MkqlItemType, *mkqlResult->MutableType());
+ ExportValueToProto(MkqlItemType, Rows[0], *mkqlResult->MutableValue());
+ }
+}
+
TTxAllocatorState::TTxAllocatorState(const IFunctionRegistry* functionRegistry,
TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider)
: Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators())
@@ -84,7 +131,7 @@ TQueryData::TQueryData(TTxAllocatorState::TPtr allocatorState)
TQueryData::~TQueryData() {
{
auto g = TypeEnv().BindAllocator();
- TTxResultVector emptyVector;
+ TVector<TVector<TKqpExecuterTxResult>> emptyVector;
TxResults.swap(emptyVector);
TUnboxedParamsMap emptyMap;
UnboxedData.swap(emptyMap);
@@ -108,6 +155,19 @@ NKikimr::NMiniKQL::TType* TQueryData::GetParameterType(const TString& name) {
return it->second.first;
}
+std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> TQueryData::GetTxResult(ui32 txIndex, ui32 resultIndex) {
+ return TxResults[txIndex][resultIndex].GetUV(
+ TypeEnv(), AllocState->HolderFactory);
+}
+
+NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(ui32 txIndex, ui32 resultIndex, google::protobuf::Arena* arena) {
+ return TxResults[txIndex][resultIndex].GetMkql(arena);
+}
+
+void TQueryData::AddTxResults(TVector<TKqpExecuterTxResult>&& results) {
+ TxResults.emplace_back(std::move(results));
+}
+
bool TQueryData::AddUVParam(const TString& name, NKikimr::NMiniKQL::TType* type, const NUdf::TUnboxedValue& value) {
auto g = TypeEnv().BindAllocator();
auto [_, success] = UnboxedData.emplace(name, std::make_pair(type, value));
@@ -216,7 +276,7 @@ void TQueryData::Clear() {
Params.clear();
TUnboxedParamsMap emptyMap;
UnboxedData.swap(emptyMap);
- TTxResultVector emptyVector;
+ TVector<TVector<TKqpExecuterTxResult>> emptyVector;
TxResults.swap(emptyVector);
AllocState->Reset();
}
diff --git a/ydb/core/kqp/gateway/kqp_query_data.h b/ydb/core/kqp/gateway/kqp_query_data.h
index c70bdcf70e6..fdbd41af47a 100644
--- a/ydb/core/kqp/gateway/kqp_query_data.h
+++ b/ydb/core/kqp/gateway/kqp_query_data.h
@@ -10,6 +10,7 @@
#include <util/generic/ptr.h>
#include <util/generic/guid.h>
+#include <google/protobuf/arena.h>
#include <unordered_map>
#include <vector>
@@ -51,6 +52,19 @@ using TTxResultVector = std::vector<
NKikimr::NMiniKQL::TMKQLAllocator<TTypedUnboxedValueVector>
>;
+struct TKqpExecuterTxResult {
+ NKikimr::NMiniKQL::TType* MkqlItemType;
+ TVector<ui32> ColumnOrder;
+ NKikimr::NMiniKQL::TUnboxedValueVector Rows;
+ bool IsStream = true;
+
+ TTypedUnboxedValue GetUV(const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::THolderFactory& factory);
+ NKikimrMiniKQL::TResult* GetMkql(google::protobuf::Arena* arena);
+ NKikimrMiniKQL::TResult GetMkql();
+ void FillMkql(NKikimrMiniKQL::TResult* mkqlResult);
+};
+
struct TTimeAndRandomProvider {
TIntrusivePtr<ITimeProvider> TimeProvider;
TIntrusivePtr<IRandomProvider> RandomProvider;
@@ -136,7 +150,7 @@ private:
TParamMap Params;
TUnboxedParamsMap UnboxedData;
- TTxResultVector TxResults;
+ TVector<TVector<TKqpExecuterTxResult>> TxResults;
TTxAllocatorState::TPtr AllocState;
public:
@@ -158,11 +172,7 @@ public:
bool AddTypedValueParam(const TString& name, const Ydb::TypedValue& p);
bool MaterializeParamValue(bool ensure, const NKqpProto::TKqpPhyParamBinding& paramBinding);
-
- void AddTxResults(TTypedUnboxedValueVector&& unboxedResults) {
- auto g = TypeEnv().BindAllocator();
- TxResults.emplace_back(unboxedResults);
- }
+ void AddTxResults(TVector<NKikimr::NKqp::TKqpExecuterTxResult>&& results);
bool HasResult(ui32 txIndex, ui32 resultIndex) {
if (txIndex >= TxResults.size())
@@ -171,9 +181,8 @@ public:
return resultIndex < TxResults[txIndex].size();
}
- std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetTxResult(ui32 txIndex, ui32 resultIndex) {
- return TxResults[txIndex][resultIndex];
- }
+ TTypedUnboxedValue GetTxResult(ui32 txIndex, ui32 resultIndex);
+ NKikimrMiniKQL::TResult* GetMkqlTxResult(ui32 txIndex, ui32 resultIndex, google::protobuf::Arena* arena);
std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding);
TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name);
diff --git a/ydb/core/kqp/runtime/kqp_transport.cpp b/ydb/core/kqp/runtime/kqp_transport.cpp
index 411acd810de..6ee6d5163e8 100644
--- a/ydb/core/kqp/runtime/kqp_transport.cpp
+++ b/ydb/core/kqp/runtime/kqp_transport.cpp
@@ -46,30 +46,21 @@ TKqpProtoBuilder::~TKqpProtoBuilder() {
}
}
-Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(const TVector<NDqProto::TData>& data,
- const NKikimrMiniKQL::TType& srcRowType, const NKikimrMiniKQL::TType* dstRowType)
+Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(
+ const TVector<NDqProto::TData>& data,
+ NKikimr::NMiniKQL::TType* mkqlSrcRowType,
+ const TVector<ui32>* columnOrder)
{
- YQL_ENSURE(srcRowType.GetKind() == NKikimrMiniKQL::Struct);
-
- auto* mkqlSrcRowType = NMiniKQL::ImportTypeFromProto(srcRowType, *TypeEnv);
- auto* mkqlSrcRowStructType = static_cast<TStructType*>(mkqlSrcRowType);
+ YQL_ENSURE(mkqlSrcRowType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct);
+ const auto* mkqlSrcRowStructType = static_cast<const TStructType*>(mkqlSrcRowType);
Ydb::ResultSet resultSet;
- if (dstRowType) {
- YQL_ENSURE(dstRowType->GetKind() == NKikimrMiniKQL::Struct);
-
- for (auto& member : dstRowType->GetStruct().GetMember()) {
- auto* column = resultSet.add_columns();
- column->set_name(member.GetName());
- ConvertMiniKQLTypeToYdbType(member.GetType(), *column->mutable_type());
- }
- } else {
- for (auto& member : srcRowType.GetStruct().GetMember()) {
- auto* column = resultSet.add_columns();
- column->set_name(member.GetName());
- ConvertMiniKQLTypeToYdbType(member.GetType(), *column->mutable_type());
- }
+ for (ui32 idx = 0; idx < mkqlSrcRowStructType->GetMembersCount(); ++idx) {
+ auto* column = resultSet.add_columns();
+ ui32 memberIndex = (!columnOrder || columnOrder->empty()) ? idx : (*columnOrder)[idx];
+ column->set_name(TString(mkqlSrcRowStructType->GetMemberName(memberIndex)));
+ ExportTypeToProto(mkqlSrcRowStructType->GetMemberType(memberIndex), *column->mutable_type());
}
THolder<TGuard<TScopedAlloc>> guard;
@@ -79,39 +70,15 @@ Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(const TVector<NDqProto::TData
auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED;
if (!data.empty()) {
- switch (data.front().GetTransportVersion()) {
- case 10000: {
- transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_YSON_1_0;
- break;
- }
- case 20000: {
- transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0;
- break;
- }
- case 30000: {
- transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_ARROW_1_0;
- break;
- }
- default:
- transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED;
- }
+ transportVersion = static_cast<NDqProto::EDataTransportVersion>(data.front().GetTransportVersion());
}
NDq::TDqDataSerializer dataSerializer(*TypeEnv, *HolderFactory, transportVersion);
-
for (auto& part : data) {
if (part.GetRows()) {
TUnboxedValueVector rows;
dataSerializer.Deserialize(part, mkqlSrcRowType, rows);
-
- TVector<ui32> columnOrder;
- if (dstRowType) {
- for(auto& dstMember: dstRowType->GetStruct().GetMember()) {
- columnOrder.push_back(mkqlSrcRowStructType->GetMemberIndex(dstMember.GetName()));
- }
- }
-
for (auto& row : rows) {
- ExportValueToProto(mkqlSrcRowType, row, *resultSet.add_rows(), &columnOrder);
+ ExportValueToProto(mkqlSrcRowType, row, *resultSet.add_rows(), columnOrder);
}
}
}
diff --git a/ydb/core/kqp/runtime/kqp_transport.h b/ydb/core/kqp/runtime/kqp_transport.h
index 8de38dd698a..989aea6e3f6 100644
--- a/ydb/core/kqp/runtime/kqp_transport.h
+++ b/ydb/core/kqp/runtime/kqp_transport.h
@@ -20,7 +20,7 @@ public:
~TKqpProtoBuilder();
Ydb::ResultSet BuildYdbResultSet(const TVector<NYql::NDqProto::TData>& data,
- const NKikimrMiniKQL::TType& srcRowType, const NKikimrMiniKQL::TType* dstRowType = nullptr);
+ NKikimr::NMiniKQL::TType* srcRowType, const TVector<ui32>* columnOrder = nullptr);
private:
NMiniKQL::TScopedAlloc* Alloc = nullptr;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index f50355f06cd..b9e1afcc8cb 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -134,8 +134,7 @@ struct TKqpQueryState {
TKqpCompileResult::TConstPtr CompileResult;
NKqpProto::TKqpStatsCompile CompileStats;
TIntrusivePtr<TKqpTransactionContext> TxCtx;
- TQueryData::TPtr Parameters;
- TVector<TVector<NKikimrMiniKQL::TResult>> TxResults;
+ TQueryData::TPtr QueryData;
TActorId RequestActorId;
@@ -380,7 +379,7 @@ public:
return;
}
QueryState->TxCtx = std::move(txCtx);
- QueryState->Parameters = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
+ QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxId = txId;
QueryState->TxId_Human = txControl.tx_id();
if (!CheckTransacionLocks()) {
@@ -811,7 +810,7 @@ public:
QueryState->TxId = UlidGen.Next();
QueryState->TxId_Human = QueryState->TxId.ToString();
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider);
- QueryState->Parameters = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
+ QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxCtx->SetIsolationLevel(settings);
CreateNewTx();
@@ -849,7 +848,7 @@ public:
return false;
}
QueryState->TxCtx = *it;
- QueryState->Parameters = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
+ QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxId = txId;
QueryState->TxId_Human = txControl.tx_id();
break;
@@ -866,7 +865,7 @@ public:
} else {
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
AppData()->TimeProvider, AppData()->RandomProvider);
- QueryState->Parameters = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
+ QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
}
@@ -915,7 +914,7 @@ public:
for(const auto& [name, param] : params) {
try {
- auto success = QueryState->Parameters->AddTypedValueParam(name, param);
+ auto success = QueryState->QueryData->AddTypedValueParam(name, param);
YQL_ENSURE(success, "Duplicate parameter: " << name);
} catch(const yexception& ex) {
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
@@ -933,7 +932,7 @@ public:
for (ui32 i = 0; i < structType.MemberSize(); ++i) {
const auto& memberName = structType.GetMember(i).GetName();
YQL_ENSURE(i < parameters.GetValue().StructSize(), "Missing value for parameter: " << memberName);
- auto success = QueryState->Parameters->AddMkqlParam(memberName, structType.GetMember(i).GetType(), parameters.GetValue().GetStruct(i));
+ auto success = QueryState->QueryData->AddMkqlParam(memberName, structType.GetMember(i).GetType(), parameters.GetValue().GetStruct(i));
YQL_ENSURE(success, "Duplicate parameter: " << memberName);
}
}
@@ -1005,11 +1004,11 @@ public:
void ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type) {
auto& txCtx = QueryState->TxCtx;
YQL_ENSURE(txCtx);
- auto parameterType = QueryState->Parameters->GetParameterType(name);
+ auto parameterType = QueryState->QueryData->GetParameterType(name);
if (!parameterType) {
if (type.GetKind() == NKikimrMiniKQL::ETypeKind::Optional) {
NKikimrMiniKQL::TValue value;
- QueryState->Parameters->AddMkqlParam(name, type, value);
+ QueryState->QueryData->AddMkqlParam(name, type, value);
return;
}
@@ -1030,12 +1029,12 @@ public:
try {
for(const auto& paramBinding: tx.GetParamBindings()) {
- QueryState->Parameters->MaterializeParamValue(true, paramBinding);
+ QueryState->QueryData->MaterializeParamValue(true, paramBinding);
}
} catch (const yexception& ex) {
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
}
- return QueryState->Parameters;
+ return QueryState->QueryData;
}
bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery* query) {
@@ -1073,9 +1072,9 @@ public:
TQueryData::TPtr CreateKqpValueMap(const NKqpProto::TKqpPhyTx& tx) {
for (const auto& paramBinding : tx.GetParamBindings()) {
- QueryState->Parameters->MaterializeParamValue(true, paramBinding);
+ QueryState->QueryData->MaterializeParamValue(true, paramBinding);
}
- return QueryState->Parameters;
+ return QueryState->QueryData;
}
bool CheckTransacionLocks() {
@@ -1389,12 +1388,9 @@ public:
auto& executerResults = *response->MutableResult();
{
- auto g = QueryState->Parameters->TypeEnv().BindAllocator();
- auto unboxed = ev->Get()->GetUnboxedValueResults();
- QueryState->Parameters->AddTxResults(std::move(unboxed));
+ auto g = QueryState->QueryData->TypeEnv().BindAllocator();
+ QueryState->QueryData->AddTxResults(std::move(ev->Get()->GetTxResults()));
}
- auto& txResult = ev->Get()->GetMkqlResults();
- QueryState->TxResults.emplace_back(std::move(txResult));
if (ev->Get()->LockHandle) {
QueryState->TxCtx->Locks.LockHandle = std::move(ev->Get()->LockHandle);
@@ -1646,10 +1642,9 @@ public:
auto txIndex = rb.GetTxResultBinding().GetTxIndex();
auto resultIndex = rb.GetTxResultBinding().GetResultIndex();
- auto& txResults = QueryState->TxResults;
- YQL_ENSURE(txIndex < txResults.size());
- YQL_ENSURE(resultIndex < txResults[txIndex].size());
-
+ YQL_ENSURE(QueryState->QueryData->HasResult(txIndex, resultIndex));
+ auto g = QueryState->QueryData->TypeEnv().BindAllocator();
+ auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, arena.get());
std::optional<IDataProvider::TFillSettings> fillSettings;
if (QueryState->PreparedQuery->ResultsSize()) {
YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), ""
@@ -1660,13 +1655,11 @@ public:
fillSettings->RowsLimitPerWrite = result.GetRowsLimit();
}
}
-
- auto* protoRes = KikimrResultToProto(txResults[txIndex][resultIndex], {},
- fillSettings.value_or(FillSettings), arena.get());
+ auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), arena.get());
if (useYdbResponseFormat) {
- ConvertKqpQueryResultToDbResult(*protoRes, response->AddYdbResults());
+ ConvertKqpQueryResultToDbResult(*finalResult, response->AddYdbResults());
} else {
- response->AddResults()->Swap(protoRes);
+ response->AddResults()->Swap(finalResult);
}
}
}
diff --git a/ydb/library/mkql_proto/mkql_proto.cpp b/ydb/library/mkql_proto/mkql_proto.cpp
index 62708363a01..e0ab361fb95 100644
--- a/ydb/library/mkql_proto/mkql_proto.cpp
+++ b/ydb/library/mkql_proto/mkql_proto.cpp
@@ -19,7 +19,7 @@ namespace NKikimr::NMiniKQL {
namespace {
-void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res);
+void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res, const TVector<ui32>* columnOrder = nullptr);
Y_FORCE_INLINE void HandleKindDataExport(const TType* type, const NUdf::TUnboxedValuePod& value, Ydb::Value& res) {
auto dataType = static_cast<const TDataType*>(type);
@@ -114,11 +114,12 @@ Y_FORCE_INLINE void HandleKindDataExport(const TType* type, const NUdf::TUnboxed
}
template<typename TOut>
-Y_FORCE_INLINE void ExportStructTypeToProto(TStructType* structType, TOut& res) {
+Y_FORCE_INLINE void ExportStructTypeToProto(TStructType* structType, TOut& res, const TVector<ui32>* columnOrder) {
for (ui32 index = 0; index < structType->GetMembersCount(); ++index) {
auto newMember = res.AddMember();
- newMember->SetName(TString(structType->GetMemberName(index)));
- ExportTypeToProtoImpl(structType->GetMemberType(index), *newMember->MutableType());
+ ui32 memberIndex = (!columnOrder || columnOrder->empty()) ? index : (*columnOrder)[index];
+ newMember->SetName(TString(structType->GetMemberName(memberIndex)));
+ ExportTypeToProtoImpl(structType->GetMemberType(memberIndex), *newMember->MutableType());
}
}
@@ -129,7 +130,7 @@ Y_FORCE_INLINE void ExportTupleTypeToProto(TTupleType* tupleType, TOut& res) {
}
}
-void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) {
+void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res, const TVector<ui32>* columnOrder) {
switch (type->GetKind()) {
case TType::EKind::Void:
res.SetKind(NKikimrMiniKQL::ETypeKind::Void);
@@ -187,7 +188,7 @@ void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) {
auto structType = static_cast<TStructType *>(type);
res.SetKind(NKikimrMiniKQL::ETypeKind::Struct);
if (structType->GetMembersCount()) {
- ExportStructTypeToProto(structType, *res.MutableStruct());
+ ExportStructTypeToProto(structType, *res.MutableStruct(), columnOrder);
}
break;
}
@@ -217,7 +218,7 @@ void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) {
TStructType *structType = static_cast<TStructType *>(innerType);
auto resItems = res.MutableVariant()->MutableStructItems();
if (structType->GetMembersCount()) {
- ExportStructTypeToProto(structType, *resItems);
+ ExportStructTypeToProto(structType, *resItems, nullptr);
}
} else if (innerType->IsTuple()) {
auto resItems = res.MutableVariant()->MutableTupleItems();
@@ -237,7 +238,7 @@ void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) {
}
}
-void ExportTypeToProtoImpl(TType* type, Ydb::Type& res) {
+void ExportTypeToProtoImpl(TType* type, Ydb::Type& res, const TVector<ui32>* columnOrder = nullptr) {
switch (type->GetKind()) {
case TType::EKind::Void:
res.set_void_type(::google::protobuf::NULL_VALUE);
@@ -303,8 +304,9 @@ void ExportTypeToProtoImpl(TType* type, Ydb::Type& res) {
auto resStruct = res.mutable_struct_type();
for (ui32 index = 0; index < structType->GetMembersCount(); ++index) {
auto newMember = resStruct->add_members();
- newMember->set_name(TString(structType->GetMemberName(index)));
- ExportTypeToProtoImpl(structType->GetMemberType(index), *newMember->mutable_type());
+ ui32 memberIndex = (!columnOrder || columnOrder->empty()) ? index : (*columnOrder)[index];
+ newMember->set_name(TString(structType->GetMemberName(memberIndex)));
+ ExportTypeToProtoImpl(structType->GetMemberType(memberIndex), *newMember->mutable_type());
}
break;
@@ -803,8 +805,8 @@ void ExportPrimitiveTypeToProto(ui32 schemeType, Ydb::Type& output) {
}
}
-void ExportTypeToProto(TType* type, Ydb::Type& res) {
- ExportTypeToProtoImpl(type, res);
+void ExportTypeToProto(TType* type, Ydb::Type& res, const TVector<ui32>* columnOrder) {
+ ExportTypeToProtoImpl(type, res, columnOrder);
}
void ExportValueToProto(TType* type, const NUdf::TUnboxedValuePod& value, Ydb::Value& res, const TVector<ui32>* columnOrder) {
@@ -1267,8 +1269,8 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
MKQL_ENSURE(false, TStringBuilder() << "Unknown protobuf type: " << type->GetKindAsStr());
}
-void ExportTypeToProto(TType* type, NKikimrMiniKQL::TType& res) {
- ExportTypeToProtoImpl(type, res);
+void ExportTypeToProto(TType* type, NKikimrMiniKQL::TType& res, const TVector<ui32>* columnOrder) {
+ ExportTypeToProtoImpl(type, res, columnOrder);
}
void ExportValueToProto(TType* type, const NUdf::TUnboxedValuePod& value, NKikimrMiniKQL::TValue& res, const TVector<ui32>* columnOrder) {
diff --git a/ydb/library/mkql_proto/mkql_proto.h b/ydb/library/mkql_proto/mkql_proto.h
index f7c0062843c..3514d2a297b 100644
--- a/ydb/library/mkql_proto/mkql_proto.h
+++ b/ydb/library/mkql_proto/mkql_proto.h
@@ -9,12 +9,12 @@ namespace NKikimr::NMiniKQL {
class THolderFactory;
-void ExportTypeToProto(TType* type, NKikimrMiniKQL::TType& res);
+void ExportTypeToProto(TType* type, NKikimrMiniKQL::TType& res, const TVector<ui32>* columnOrder = nullptr);
void ExportValueToProto(TType* type, const NUdf::TUnboxedValuePod& value, NKikimrMiniKQL::TValue& res, const TVector<ui32>* columnOrder = nullptr);
void ExportPrimitiveTypeToProto(ui32 schemeType, Ydb::Type& output);
-void ExportTypeToProto(TType* type, Ydb::Type& res);
+void ExportTypeToProto(TType* type, Ydb::Type& res, const TVector<ui32>* columnOrder = nullptr);
void ExportValueToProto(TType* type, const NUdf::TUnboxedValuePod& value, Ydb::Value& res, const TVector<ui32>* columnOrder = nullptr);