diff options
author | gvit <gvit@ydb.tech> | 2022-12-30 16:54:32 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-12-30 16:54:32 +0300 |
commit | 58ff0f051f57461c1377ac11c38b0e1804fe9b4f (patch) | |
tree | 1365e95723b1f3e74bf3bb8949dc00dae5137b39 | |
parent | 5de12f5f8442209d2b18e94feb307cf054e5b542 (diff) | |
download | ydb-58ff0f051f57461c1377ac11c38b0e1804fe9b4f.tar.gz |
don't stage useless mkql data in session actor
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 13 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 72 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_result_channel.cpp | 18 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_result_channel.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_query_data.cpp | 64 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_query_data.h | 27 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_transport.cpp | 59 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_transport.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 49 | ||||
-rw-r--r-- | ydb/library/mkql_proto/mkql_proto.cpp | 30 | ||||
-rw-r--r-- | ydb/library/mkql_proto/mkql_proto.h | 4 |
13 files changed, 154 insertions, 204 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 95c6b02d302..0cab182400d 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -12,15 +12,6 @@ namespace NKikimr { namespace NKqp { -struct TKqpExecuterTxResult { - NKikimrMiniKQL::TType ItemType; - NKikimr::NMiniKQL::TType* MkqlItemType; - TVector<ui32> ColumnOrder; - std::optional<NKikimrMiniKQL::TType> ResultItemType; - NKikimr::NMiniKQL::TUnboxedValueVector Rows; - bool IsStream = true; -}; - struct TEvKqpExecuter { struct TEvTxRequest : public TEventPB<TEvTxRequest, NKikimrKqp::TEvExecuterTxRequest, TKqpExecuterEvents::EvTxRequest> {}; @@ -42,10 +33,8 @@ struct TEvKqpExecuter { ~TEvTxResponse(); - TTypedUnboxedValueVector GetUnboxedValueResults(); - TVector<NKikimrMiniKQL::TResult>& GetMkqlResults(); + TVector<TKqpExecuterTxResult>& GetTxResults() { return TxResults; } void InitTxResult(const NKqpProto::TKqpPhyTx& tx); - TTypedUnboxedValueVector Finalize(); void TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnboxedValueVector& rows); void TakeResult(ui32 idx, const NYql::NDqProto::TData& rows); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 55b63bf8976..7e58c735738 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -22,8 +22,7 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const NKqpProto::TKqpPhyTx& tx) for (const auto& txResult : tx.GetResults()) { auto& result = TxResults[i++]; result.IsStream = txResult.GetIsStream(); - result.ItemType.CopyFrom(txResult.GetItemType()); - result.MkqlItemType = ImportTypeFromProto(result.ItemType, AllocState->TypeEnv); + result.MkqlItemType = ImportTypeFromProto(txResult.GetItemType(), AllocState->TypeEnv); if (txResult.ColumnHintsSize() > 0) { result.ColumnOrder.reserve(txResult.GetColumnHints().size()); @@ -39,12 +38,7 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const NKqpProto::TKqpPhyTx& tx) auto it = memberIndices.find(name); YQL_ENSURE(it != memberIndices.end(), "undetermined column name: " << name); result.ColumnOrder.push_back(it->second); - - auto* newMember = resultItemType.MutableStruct()->AddMember(); - newMember->SetName(name); - ExportTypeToProto(structType->GetMemberType(it->second), *newMember->MutableType()); } - result.ResultItemType.emplace(std::move(resultItemType)); } } } @@ -81,70 +75,6 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnb serializer.Deserialize(buffer, txResult.MkqlItemType, txResult.Rows); } -TTypedUnboxedValueVector TEvKqpExecuter::TEvTxResponse::GetUnboxedValueResults() { - return Finalize(); -} - -TVector<NKikimrMiniKQL::TResult>& TEvKqpExecuter::TEvTxResponse::GetMkqlResults() { - if (MkqlResults_.empty()) { - Finalize(); - } - return MkqlResults_; -} - -TTypedUnboxedValueVector TEvKqpExecuter::TEvTxResponse::Finalize() { - auto g = AllocState->TypeEnv.BindAllocator(); - MkqlResults_.resize(TxResults.size()); - ui32 idx = 0; - for (auto& result : TxResults) { - auto& mkqlResult = MkqlResults_[idx++]; - if (result.IsStream) { - mkqlResult.MutableType()->SetKind(NKikimrMiniKQL::List); - if (result.ResultItemType) { - mkqlResult.MutableType()->MutableList()->MutableItem()->CopyFrom( - *result.ResultItemType); - } else { - mkqlResult.MutableType()->MutableList()->MutableItem()->CopyFrom( - result.ItemType); - } - - for(auto& row: result.Rows) { - ExportValueToProto( - result.MkqlItemType, row, *mkqlResult.MutableValue()->AddList(), - &result.ColumnOrder); - } - - } else { - YQL_ENSURE(result.Rows.size() == 1, "Actual buffer size: " << result.Rows.size()); - mkqlResult.MutableType()->CopyFrom(result.ItemType); - ExportValueToProto(result.MkqlItemType, result.Rows[0], *mkqlResult.MutableValue()); - } - } - - TTypedUnboxedValueVector UnboxedResult_; - - for(auto& txResult: TxResults) { - auto* type = txResult.MkqlItemType; - if (txResult.IsStream) { - auto* listOfItemType = NKikimr::NMiniKQL::TListType::Create(type, AllocState->TypeEnv); - NUdf::TUnboxedValue value = AllocState->HolderFactory.VectorAsArray(txResult.Rows); - if (txResult.ResultItemType) { - auto* type = ImportTypeFromProto(*txResult.ResultItemType, AllocState->TypeEnv); - auto* listType = NKikimr::NMiniKQL::TListType::Create(type, AllocState->TypeEnv); - UnboxedResult_.push_back(std::make_pair(listType, value)); - } else { - UnboxedResult_.push_back(std::make_pair(listOfItemType, value)); - } - } else { - UnboxedResult_.push_back(std::make_pair(type, txResult.Rows[0])); - } - } - - TxResults.crop(0); - return UnboxedResult_; -} - - void PrepareKqpTaskParameters(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, const TTask& task, NDqProto::TDqTask& dqTask, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory&) { diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp index d4e921f1392..3d83981ca7e 100644 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp +++ b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp @@ -119,11 +119,11 @@ private: class TResultStreamChannelProxy : public TResultCommonChannelProxy { public: - TResultStreamChannelProxy(ui64 txId, ui64 channelId, const NKikimrMiniKQL::TType& itemType, - const NKikimrMiniKQL::TType* resultItemType, TActorId target, TQueryExecutionStats* stats, + TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, + const TVector<ui32>* columnOrder, TActorId target, TQueryExecutionStats* stats, TActorId executer) : TResultCommonChannelProxy(txId, channelId, stats, executer) - , ResultItemType(resultItemType) + , ColumnOrder(columnOrder) , ItemType(itemType) , Target(target) {} @@ -134,7 +134,7 @@ private: auto& channelData = computeData.GetChannelData(); TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry}; - auto resultSet = protoBuilder.BuildYdbResultSet({channelData.GetData()}, ItemType, ResultItemType); + auto resultSet = protoBuilder.BuildYdbResultSet({channelData.GetData()}, ItemType, ColumnOrder); auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>(); streamEv->Record.SetSeqNo(computeData.GetSeqNo()); @@ -147,8 +147,8 @@ private: Send(Target, streamEv.Release()); } private: - const NKikimrMiniKQL::TType* ResultItemType; - const NKikimrMiniKQL::TType& ItemType; + const TVector<ui32>* ColumnOrder; + NKikimr::NMiniKQL::TType* ItemType; const NActors::TActorId Target; }; @@ -183,15 +183,15 @@ private: } // anonymous namespace end -NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, const NKikimrMiniKQL::TType& itemType, - const NKikimrMiniKQL::TType* resultItemType, TActorId target, TQueryExecutionStats* stats, TActorId executer) +NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, + const TVector<ui32>* columnOrder, TActorId target, TQueryExecutionStats* stats, TActorId executer) { LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "CreateResultStreamChannelProxy: TxId: " << txId << ", channelId: " << channelId ); - return new TResultStreamChannelProxy(txId, channelId, itemType, resultItemType, target, stats, executer); + return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, target, stats, executer); } NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.h b/ydb/core/kqp/executer_actor/kqp_result_channel.h index 907d163a46c..5cc8c54fb00 100644 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.h +++ b/ydb/core/kqp/executer_actor/kqp_result_channel.h @@ -25,8 +25,8 @@ namespace NKikimr::NKqp { struct TQueryExecutionStats; struct TKqpExecuterTxResult; -NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, const NKikimrMiniKQL::TType& itemType, - const NKikimrMiniKQL::TType* resultItemType, NActors::TActorId target, TQueryExecutionStats* stats, +NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, + const TVector<ui32>* columnOrder, NActors::TActorId target, TQueryExecutionStats* stats, NActors::TActorId executer); NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats, diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 93382dae47f..2a5d0e102ef 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -733,11 +733,8 @@ public: return ResultChannelProxies.begin()->second; } - auto resultType = ResponseEv->TxResults[0].ResultItemType ? - &ResponseEv->TxResults[0].ResultItemType.value() : nullptr; - - proxy = CreateResultStreamChannelProxy(TxId, channel.Id, ResponseEv->TxResults[0].ItemType, - resultType, Target, Stats.get(), SelfId()); + proxy = CreateResultStreamChannelProxy(TxId, channel.Id, ResponseEv->TxResults[0].MkqlItemType, + &ResponseEv->TxResults[0].ColumnOrder, Target, Stats.get(), SelfId()); } else { YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize()); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 351398b3df7..004b3fc7dd6 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -513,10 +513,13 @@ private: result.ExecuterResult.Swap(response->MutableResult()); { auto g = Parameters->TypeEnv().BindAllocator(); - auto unboxed = ev->Get()->GetUnboxedValueResults(); - Parameters->AddTxResults(std::move(unboxed)); + auto& txResults = ev->Get()->GetTxResults(); + result.Results.reserve(txResults.size()); + for(auto& tx : txResults) { + result.Results.emplace_back(std::move(tx.GetMkql())); + } + Parameters->AddTxResults(std::move(txResults)); } - result.Results = std::move(ev->Get()->GetMkqlResults()); Promise.SetValue(std::move(result)); this->PassAway(); } diff --git a/ydb/core/kqp/gateway/kqp_query_data.cpp b/ydb/core/kqp/gateway/kqp_query_data.cpp index 5af63049b25..f68c1545436 100644 --- a/ydb/core/kqp/gateway/kqp_query_data.cpp +++ b/ydb/core/kqp/gateway/kqp_query_data.cpp @@ -13,6 +13,53 @@ using namespace NKikimr::NMiniKQL; using namespace NYql; using namespace NYql::NUdf; +TTypedUnboxedValue TKqpExecuterTxResult::GetUV( + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& factory) +{ + if (IsStream) { + auto* listOfItemType = NKikimr::NMiniKQL::TListType::Create(MkqlItemType, typeEnv); + NUdf::TUnboxedValue value = factory.VectorAsArray(Rows); + return {listOfItemType, value}; + } else { + YQL_ENSURE(Rows.size() == 1, "Actual buffer size: " << Rows.size()); + return {MkqlItemType, Rows[0]}; + } +} + +NKikimrMiniKQL::TResult* TKqpExecuterTxResult::GetMkql(google::protobuf::Arena* arena) { + NKikimrMiniKQL::TResult* mkqlResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(arena); + FillMkql(mkqlResult); + return mkqlResult; +} + +NKikimrMiniKQL::TResult TKqpExecuterTxResult::GetMkql() { + NKikimrMiniKQL::TResult mkqlResult; + FillMkql(&mkqlResult); + return mkqlResult; +} + +void TKqpExecuterTxResult::FillMkql(NKikimrMiniKQL::TResult* mkqlResult) { + if (IsStream) { + mkqlResult->MutableType()->SetKind(NKikimrMiniKQL::List); + ExportTypeToProto( + MkqlItemType, + *mkqlResult->MutableType()->MutableList()->MutableItem(), + &ColumnOrder); + + for(auto& row: Rows) { + ExportValueToProto( + MkqlItemType, row, *mkqlResult->MutableValue()->AddList(), + &ColumnOrder); + } + + } else { + YQL_ENSURE(Rows.size() == 1, "Actual buffer size: " << Rows.size()); + ExportTypeToProto(MkqlItemType, *mkqlResult->MutableType()); + ExportValueToProto(MkqlItemType, Rows[0], *mkqlResult->MutableValue()); + } +} + TTxAllocatorState::TTxAllocatorState(const IFunctionRegistry* functionRegistry, TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider) : Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators()) @@ -84,7 +131,7 @@ TQueryData::TQueryData(TTxAllocatorState::TPtr allocatorState) TQueryData::~TQueryData() { { auto g = TypeEnv().BindAllocator(); - TTxResultVector emptyVector; + TVector<TVector<TKqpExecuterTxResult>> emptyVector; TxResults.swap(emptyVector); TUnboxedParamsMap emptyMap; UnboxedData.swap(emptyMap); @@ -108,6 +155,19 @@ NKikimr::NMiniKQL::TType* TQueryData::GetParameterType(const TString& name) { return it->second.first; } +std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> TQueryData::GetTxResult(ui32 txIndex, ui32 resultIndex) { + return TxResults[txIndex][resultIndex].GetUV( + TypeEnv(), AllocState->HolderFactory); +} + +NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(ui32 txIndex, ui32 resultIndex, google::protobuf::Arena* arena) { + return TxResults[txIndex][resultIndex].GetMkql(arena); +} + +void TQueryData::AddTxResults(TVector<TKqpExecuterTxResult>&& results) { + TxResults.emplace_back(std::move(results)); +} + bool TQueryData::AddUVParam(const TString& name, NKikimr::NMiniKQL::TType* type, const NUdf::TUnboxedValue& value) { auto g = TypeEnv().BindAllocator(); auto [_, success] = UnboxedData.emplace(name, std::make_pair(type, value)); @@ -216,7 +276,7 @@ void TQueryData::Clear() { Params.clear(); TUnboxedParamsMap emptyMap; UnboxedData.swap(emptyMap); - TTxResultVector emptyVector; + TVector<TVector<TKqpExecuterTxResult>> emptyVector; TxResults.swap(emptyVector); AllocState->Reset(); } diff --git a/ydb/core/kqp/gateway/kqp_query_data.h b/ydb/core/kqp/gateway/kqp_query_data.h index c70bdcf70e6..fdbd41af47a 100644 --- a/ydb/core/kqp/gateway/kqp_query_data.h +++ b/ydb/core/kqp/gateway/kqp_query_data.h @@ -10,6 +10,7 @@ #include <util/generic/ptr.h> #include <util/generic/guid.h> +#include <google/protobuf/arena.h> #include <unordered_map> #include <vector> @@ -51,6 +52,19 @@ using TTxResultVector = std::vector< NKikimr::NMiniKQL::TMKQLAllocator<TTypedUnboxedValueVector> >; +struct TKqpExecuterTxResult { + NKikimr::NMiniKQL::TType* MkqlItemType; + TVector<ui32> ColumnOrder; + NKikimr::NMiniKQL::TUnboxedValueVector Rows; + bool IsStream = true; + + TTypedUnboxedValue GetUV(const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& factory); + NKikimrMiniKQL::TResult* GetMkql(google::protobuf::Arena* arena); + NKikimrMiniKQL::TResult GetMkql(); + void FillMkql(NKikimrMiniKQL::TResult* mkqlResult); +}; + struct TTimeAndRandomProvider { TIntrusivePtr<ITimeProvider> TimeProvider; TIntrusivePtr<IRandomProvider> RandomProvider; @@ -136,7 +150,7 @@ private: TParamMap Params; TUnboxedParamsMap UnboxedData; - TTxResultVector TxResults; + TVector<TVector<TKqpExecuterTxResult>> TxResults; TTxAllocatorState::TPtr AllocState; public: @@ -158,11 +172,7 @@ public: bool AddTypedValueParam(const TString& name, const Ydb::TypedValue& p); bool MaterializeParamValue(bool ensure, const NKqpProto::TKqpPhyParamBinding& paramBinding); - - void AddTxResults(TTypedUnboxedValueVector&& unboxedResults) { - auto g = TypeEnv().BindAllocator(); - TxResults.emplace_back(unboxedResults); - } + void AddTxResults(TVector<NKikimr::NKqp::TKqpExecuterTxResult>&& results); bool HasResult(ui32 txIndex, ui32 resultIndex) { if (txIndex >= TxResults.size()) @@ -171,9 +181,8 @@ public: return resultIndex < TxResults[txIndex].size(); } - std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetTxResult(ui32 txIndex, ui32 resultIndex) { - return TxResults[txIndex][resultIndex]; - } + TTypedUnboxedValue GetTxResult(ui32 txIndex, ui32 resultIndex); + NKikimrMiniKQL::TResult* GetMkqlTxResult(ui32 txIndex, ui32 resultIndex, google::protobuf::Arena* arena); std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding); TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name); diff --git a/ydb/core/kqp/runtime/kqp_transport.cpp b/ydb/core/kqp/runtime/kqp_transport.cpp index 411acd810de..6ee6d5163e8 100644 --- a/ydb/core/kqp/runtime/kqp_transport.cpp +++ b/ydb/core/kqp/runtime/kqp_transport.cpp @@ -46,30 +46,21 @@ TKqpProtoBuilder::~TKqpProtoBuilder() { } } -Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(const TVector<NDqProto::TData>& data, - const NKikimrMiniKQL::TType& srcRowType, const NKikimrMiniKQL::TType* dstRowType) +Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet( + const TVector<NDqProto::TData>& data, + NKikimr::NMiniKQL::TType* mkqlSrcRowType, + const TVector<ui32>* columnOrder) { - YQL_ENSURE(srcRowType.GetKind() == NKikimrMiniKQL::Struct); - - auto* mkqlSrcRowType = NMiniKQL::ImportTypeFromProto(srcRowType, *TypeEnv); - auto* mkqlSrcRowStructType = static_cast<TStructType*>(mkqlSrcRowType); + YQL_ENSURE(mkqlSrcRowType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct); + const auto* mkqlSrcRowStructType = static_cast<const TStructType*>(mkqlSrcRowType); Ydb::ResultSet resultSet; - if (dstRowType) { - YQL_ENSURE(dstRowType->GetKind() == NKikimrMiniKQL::Struct); - - for (auto& member : dstRowType->GetStruct().GetMember()) { - auto* column = resultSet.add_columns(); - column->set_name(member.GetName()); - ConvertMiniKQLTypeToYdbType(member.GetType(), *column->mutable_type()); - } - } else { - for (auto& member : srcRowType.GetStruct().GetMember()) { - auto* column = resultSet.add_columns(); - column->set_name(member.GetName()); - ConvertMiniKQLTypeToYdbType(member.GetType(), *column->mutable_type()); - } + for (ui32 idx = 0; idx < mkqlSrcRowStructType->GetMembersCount(); ++idx) { + auto* column = resultSet.add_columns(); + ui32 memberIndex = (!columnOrder || columnOrder->empty()) ? idx : (*columnOrder)[idx]; + column->set_name(TString(mkqlSrcRowStructType->GetMemberName(memberIndex))); + ExportTypeToProto(mkqlSrcRowStructType->GetMemberType(memberIndex), *column->mutable_type()); } THolder<TGuard<TScopedAlloc>> guard; @@ -79,39 +70,15 @@ Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(const TVector<NDqProto::TData auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED; if (!data.empty()) { - switch (data.front().GetTransportVersion()) { - case 10000: { - transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_YSON_1_0; - break; - } - case 20000: { - transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0; - break; - } - case 30000: { - transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_ARROW_1_0; - break; - } - default: - transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED; - } + transportVersion = static_cast<NDqProto::EDataTransportVersion>(data.front().GetTransportVersion()); } NDq::TDqDataSerializer dataSerializer(*TypeEnv, *HolderFactory, transportVersion); - for (auto& part : data) { if (part.GetRows()) { TUnboxedValueVector rows; dataSerializer.Deserialize(part, mkqlSrcRowType, rows); - - TVector<ui32> columnOrder; - if (dstRowType) { - for(auto& dstMember: dstRowType->GetStruct().GetMember()) { - columnOrder.push_back(mkqlSrcRowStructType->GetMemberIndex(dstMember.GetName())); - } - } - for (auto& row : rows) { - ExportValueToProto(mkqlSrcRowType, row, *resultSet.add_rows(), &columnOrder); + ExportValueToProto(mkqlSrcRowType, row, *resultSet.add_rows(), columnOrder); } } } diff --git a/ydb/core/kqp/runtime/kqp_transport.h b/ydb/core/kqp/runtime/kqp_transport.h index 8de38dd698a..989aea6e3f6 100644 --- a/ydb/core/kqp/runtime/kqp_transport.h +++ b/ydb/core/kqp/runtime/kqp_transport.h @@ -20,7 +20,7 @@ public: ~TKqpProtoBuilder(); Ydb::ResultSet BuildYdbResultSet(const TVector<NYql::NDqProto::TData>& data, - const NKikimrMiniKQL::TType& srcRowType, const NKikimrMiniKQL::TType* dstRowType = nullptr); + NKikimr::NMiniKQL::TType* srcRowType, const TVector<ui32>* columnOrder = nullptr); private: NMiniKQL::TScopedAlloc* Alloc = nullptr; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index f50355f06cd..b9e1afcc8cb 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -134,8 +134,7 @@ struct TKqpQueryState { TKqpCompileResult::TConstPtr CompileResult; NKqpProto::TKqpStatsCompile CompileStats; TIntrusivePtr<TKqpTransactionContext> TxCtx; - TQueryData::TPtr Parameters; - TVector<TVector<NKikimrMiniKQL::TResult>> TxResults; + TQueryData::TPtr QueryData; TActorId RequestActorId; @@ -380,7 +379,7 @@ public: return; } QueryState->TxCtx = std::move(txCtx); - QueryState->Parameters = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); + QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); QueryState->TxId = txId; QueryState->TxId_Human = txControl.tx_id(); if (!CheckTransacionLocks()) { @@ -811,7 +810,7 @@ public: QueryState->TxId = UlidGen.Next(); QueryState->TxId_Human = QueryState->TxId.ToString(); QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); - QueryState->Parameters = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); + QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); QueryState->TxCtx->SetIsolationLevel(settings); CreateNewTx(); @@ -849,7 +848,7 @@ public: return false; } QueryState->TxCtx = *it; - QueryState->Parameters = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); + QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); QueryState->TxId = txId; QueryState->TxId_Human = txControl.tx_id(); break; @@ -866,7 +865,7 @@ public: } else { QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); - QueryState->Parameters = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); + QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc); QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; } @@ -915,7 +914,7 @@ public: for(const auto& [name, param] : params) { try { - auto success = QueryState->Parameters->AddTypedValueParam(name, param); + auto success = QueryState->QueryData->AddTypedValueParam(name, param); YQL_ENSURE(success, "Duplicate parameter: " << name); } catch(const yexception& ex) { ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); @@ -933,7 +932,7 @@ public: for (ui32 i = 0; i < structType.MemberSize(); ++i) { const auto& memberName = structType.GetMember(i).GetName(); YQL_ENSURE(i < parameters.GetValue().StructSize(), "Missing value for parameter: " << memberName); - auto success = QueryState->Parameters->AddMkqlParam(memberName, structType.GetMember(i).GetType(), parameters.GetValue().GetStruct(i)); + auto success = QueryState->QueryData->AddMkqlParam(memberName, structType.GetMember(i).GetType(), parameters.GetValue().GetStruct(i)); YQL_ENSURE(success, "Duplicate parameter: " << memberName); } } @@ -1005,11 +1004,11 @@ public: void ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type) { auto& txCtx = QueryState->TxCtx; YQL_ENSURE(txCtx); - auto parameterType = QueryState->Parameters->GetParameterType(name); + auto parameterType = QueryState->QueryData->GetParameterType(name); if (!parameterType) { if (type.GetKind() == NKikimrMiniKQL::ETypeKind::Optional) { NKikimrMiniKQL::TValue value; - QueryState->Parameters->AddMkqlParam(name, type, value); + QueryState->QueryData->AddMkqlParam(name, type, value); return; } @@ -1030,12 +1029,12 @@ public: try { for(const auto& paramBinding: tx.GetParamBindings()) { - QueryState->Parameters->MaterializeParamValue(true, paramBinding); + QueryState->QueryData->MaterializeParamValue(true, paramBinding); } } catch (const yexception& ex) { ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); } - return QueryState->Parameters; + return QueryState->QueryData; } bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery* query) { @@ -1073,9 +1072,9 @@ public: TQueryData::TPtr CreateKqpValueMap(const NKqpProto::TKqpPhyTx& tx) { for (const auto& paramBinding : tx.GetParamBindings()) { - QueryState->Parameters->MaterializeParamValue(true, paramBinding); + QueryState->QueryData->MaterializeParamValue(true, paramBinding); } - return QueryState->Parameters; + return QueryState->QueryData; } bool CheckTransacionLocks() { @@ -1389,12 +1388,9 @@ public: auto& executerResults = *response->MutableResult(); { - auto g = QueryState->Parameters->TypeEnv().BindAllocator(); - auto unboxed = ev->Get()->GetUnboxedValueResults(); - QueryState->Parameters->AddTxResults(std::move(unboxed)); + auto g = QueryState->QueryData->TypeEnv().BindAllocator(); + QueryState->QueryData->AddTxResults(std::move(ev->Get()->GetTxResults())); } - auto& txResult = ev->Get()->GetMkqlResults(); - QueryState->TxResults.emplace_back(std::move(txResult)); if (ev->Get()->LockHandle) { QueryState->TxCtx->Locks.LockHandle = std::move(ev->Get()->LockHandle); @@ -1646,10 +1642,9 @@ public: auto txIndex = rb.GetTxResultBinding().GetTxIndex(); auto resultIndex = rb.GetTxResultBinding().GetResultIndex(); - auto& txResults = QueryState->TxResults; - YQL_ENSURE(txIndex < txResults.size()); - YQL_ENSURE(resultIndex < txResults[txIndex].size()); - + YQL_ENSURE(QueryState->QueryData->HasResult(txIndex, resultIndex)); + auto g = QueryState->QueryData->TypeEnv().BindAllocator(); + auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, arena.get()); std::optional<IDataProvider::TFillSettings> fillSettings; if (QueryState->PreparedQuery->ResultsSize()) { YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), "" @@ -1660,13 +1655,11 @@ public: fillSettings->RowsLimitPerWrite = result.GetRowsLimit(); } } - - auto* protoRes = KikimrResultToProto(txResults[txIndex][resultIndex], {}, - fillSettings.value_or(FillSettings), arena.get()); + auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), arena.get()); if (useYdbResponseFormat) { - ConvertKqpQueryResultToDbResult(*protoRes, response->AddYdbResults()); + ConvertKqpQueryResultToDbResult(*finalResult, response->AddYdbResults()); } else { - response->AddResults()->Swap(protoRes); + response->AddResults()->Swap(finalResult); } } } diff --git a/ydb/library/mkql_proto/mkql_proto.cpp b/ydb/library/mkql_proto/mkql_proto.cpp index 62708363a01..e0ab361fb95 100644 --- a/ydb/library/mkql_proto/mkql_proto.cpp +++ b/ydb/library/mkql_proto/mkql_proto.cpp @@ -19,7 +19,7 @@ namespace NKikimr::NMiniKQL { namespace { -void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res); +void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res, const TVector<ui32>* columnOrder = nullptr); Y_FORCE_INLINE void HandleKindDataExport(const TType* type, const NUdf::TUnboxedValuePod& value, Ydb::Value& res) { auto dataType = static_cast<const TDataType*>(type); @@ -114,11 +114,12 @@ Y_FORCE_INLINE void HandleKindDataExport(const TType* type, const NUdf::TUnboxed } template<typename TOut> -Y_FORCE_INLINE void ExportStructTypeToProto(TStructType* structType, TOut& res) { +Y_FORCE_INLINE void ExportStructTypeToProto(TStructType* structType, TOut& res, const TVector<ui32>* columnOrder) { for (ui32 index = 0; index < structType->GetMembersCount(); ++index) { auto newMember = res.AddMember(); - newMember->SetName(TString(structType->GetMemberName(index))); - ExportTypeToProtoImpl(structType->GetMemberType(index), *newMember->MutableType()); + ui32 memberIndex = (!columnOrder || columnOrder->empty()) ? index : (*columnOrder)[index]; + newMember->SetName(TString(structType->GetMemberName(memberIndex))); + ExportTypeToProtoImpl(structType->GetMemberType(memberIndex), *newMember->MutableType()); } } @@ -129,7 +130,7 @@ Y_FORCE_INLINE void ExportTupleTypeToProto(TTupleType* tupleType, TOut& res) { } } -void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) { +void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res, const TVector<ui32>* columnOrder) { switch (type->GetKind()) { case TType::EKind::Void: res.SetKind(NKikimrMiniKQL::ETypeKind::Void); @@ -187,7 +188,7 @@ void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) { auto structType = static_cast<TStructType *>(type); res.SetKind(NKikimrMiniKQL::ETypeKind::Struct); if (structType->GetMembersCount()) { - ExportStructTypeToProto(structType, *res.MutableStruct()); + ExportStructTypeToProto(structType, *res.MutableStruct(), columnOrder); } break; } @@ -217,7 +218,7 @@ void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) { TStructType *structType = static_cast<TStructType *>(innerType); auto resItems = res.MutableVariant()->MutableStructItems(); if (structType->GetMembersCount()) { - ExportStructTypeToProto(structType, *resItems); + ExportStructTypeToProto(structType, *resItems, nullptr); } } else if (innerType->IsTuple()) { auto resItems = res.MutableVariant()->MutableTupleItems(); @@ -237,7 +238,7 @@ void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) { } } -void ExportTypeToProtoImpl(TType* type, Ydb::Type& res) { +void ExportTypeToProtoImpl(TType* type, Ydb::Type& res, const TVector<ui32>* columnOrder = nullptr) { switch (type->GetKind()) { case TType::EKind::Void: res.set_void_type(::google::protobuf::NULL_VALUE); @@ -303,8 +304,9 @@ void ExportTypeToProtoImpl(TType* type, Ydb::Type& res) { auto resStruct = res.mutable_struct_type(); for (ui32 index = 0; index < structType->GetMembersCount(); ++index) { auto newMember = resStruct->add_members(); - newMember->set_name(TString(structType->GetMemberName(index))); - ExportTypeToProtoImpl(structType->GetMemberType(index), *newMember->mutable_type()); + ui32 memberIndex = (!columnOrder || columnOrder->empty()) ? index : (*columnOrder)[index]; + newMember->set_name(TString(structType->GetMemberName(memberIndex))); + ExportTypeToProtoImpl(structType->GetMemberType(memberIndex), *newMember->mutable_type()); } break; @@ -803,8 +805,8 @@ void ExportPrimitiveTypeToProto(ui32 schemeType, Ydb::Type& output) { } } -void ExportTypeToProto(TType* type, Ydb::Type& res) { - ExportTypeToProtoImpl(type, res); +void ExportTypeToProto(TType* type, Ydb::Type& res, const TVector<ui32>* columnOrder) { + ExportTypeToProtoImpl(type, res, columnOrder); } void ExportValueToProto(TType* type, const NUdf::TUnboxedValuePod& value, Ydb::Value& res, const TVector<ui32>* columnOrder) { @@ -1267,8 +1269,8 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV MKQL_ENSURE(false, TStringBuilder() << "Unknown protobuf type: " << type->GetKindAsStr()); } -void ExportTypeToProto(TType* type, NKikimrMiniKQL::TType& res) { - ExportTypeToProtoImpl(type, res); +void ExportTypeToProto(TType* type, NKikimrMiniKQL::TType& res, const TVector<ui32>* columnOrder) { + ExportTypeToProtoImpl(type, res, columnOrder); } void ExportValueToProto(TType* type, const NUdf::TUnboxedValuePod& value, NKikimrMiniKQL::TValue& res, const TVector<ui32>* columnOrder) { diff --git a/ydb/library/mkql_proto/mkql_proto.h b/ydb/library/mkql_proto/mkql_proto.h index f7c0062843c..3514d2a297b 100644 --- a/ydb/library/mkql_proto/mkql_proto.h +++ b/ydb/library/mkql_proto/mkql_proto.h @@ -9,12 +9,12 @@ namespace NKikimr::NMiniKQL { class THolderFactory; -void ExportTypeToProto(TType* type, NKikimrMiniKQL::TType& res); +void ExportTypeToProto(TType* type, NKikimrMiniKQL::TType& res, const TVector<ui32>* columnOrder = nullptr); void ExportValueToProto(TType* type, const NUdf::TUnboxedValuePod& value, NKikimrMiniKQL::TValue& res, const TVector<ui32>* columnOrder = nullptr); void ExportPrimitiveTypeToProto(ui32 schemeType, Ydb::Type& output); -void ExportTypeToProto(TType* type, Ydb::Type& res); +void ExportTypeToProto(TType* type, Ydb::Type& res, const TVector<ui32>* columnOrder = nullptr); void ExportValueToProto(TType* type, const NUdf::TUnboxedValuePod& value, Ydb::Value& res, const TVector<ui32>* columnOrder = nullptr); |