diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-17 15:10:42 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-17 15:10:42 +0300 |
commit | ccc1f1bc46a5282af7e0372de86bc26b71adc462 (patch) | |
tree | bc1efe36b7aade30008c0b67b6cc567e9e9d1d65 | |
parent | 86cdce38e8520ed2e289acb04dbdb6230c0e1dbf (diff) | |
download | ydb-ccc1f1bc46a5282af7e0372de86bc26b71adc462.tar.gz |
Refactor parameters code in SessionActor
-rw-r--r-- | ydb/core/kqp/common/kqp.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_query_data.cpp | 49 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_query_data.h | 11 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 75 |
5 files changed, 74 insertions, 65 deletions
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 1f03ca80f1..c6d5c0dba0 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -725,7 +725,7 @@ struct TEvKqp { NLWTrace::TOrbit Orbit; }; - struct TEvKqpProxyPublishRequest : + struct TEvKqpProxyPublishRequest : public TEventLocal<TEvKqpProxyPublishRequest, TKqpEvents::EvKqpProxyPublishRequest> {}; struct TEvCompileInvalidateRequest : public TEventLocal<TEvCompileInvalidateRequest, diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 96b8c61549..576f5a2c43 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -511,7 +511,7 @@ private: auto& txResults = ev->GetTxResults(); result.Results.reserve(txResults.size()); for(auto& tx : txResults) { - result.Results.emplace_back(std::move(tx.GetMkql())); + result.Results.emplace_back(tx.GetMkql()); } Parameters->AddTxHolders(std::move(ev->GetTxHolders())); Parameters->AddTxResults(std::move(txResults)); diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp index d16c3eabc8..1bee2405dd 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.cpp +++ b/ydb/core/kqp/query_data/kqp_query_data.cpp @@ -1,11 +1,12 @@ #include "kqp_query_data.h" #include <ydb/core/protos/kqp_physical.pb.h> + #include <ydb/library/mkql_proto/mkql_proto.h> -#include <ydb/library/yql/minikql/mkql_string_util.h> -#include <ydb/library/yql/utils/yql_panic.h> #include <ydb/library/yql/dq/runtime/dq_transport.h> +#include <ydb/library/yql/minikql/mkql_string_util.h> #include <ydb/library/yql/public/udf/udf_data_type.h> +#include <ydb/library/yql/utils/yql_panic.h> namespace NKikimr::NKqp { @@ -160,11 +161,17 @@ std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> TQueryData::GetTxResul TypeEnv(), AllocState->HolderFactory); } -NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(ui32 txIndex, ui32 resultIndex, google::protobuf::Arena* arena) { +NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena) { + auto txIndex = rb.GetTxResultBinding().GetTxIndex(); + auto resultIndex = rb.GetTxResultBinding().GetResultIndex(); + + YQL_ENSURE(HasResult(txIndex, resultIndex)); + auto g = TypeEnv().BindAllocator(); return TxResults[txIndex][resultIndex].GetMkql(arena); } void TQueryData::AddTxResults(TVector<TKqpExecuterTxResult>&& results) { + auto g = TypeEnv().BindAllocator(); TxResults.emplace_back(std::move(results)); } @@ -172,6 +179,42 @@ void TQueryData::AddTxHolders(TVector<TKqpPhyTxHolder::TConstPtr>&& holders) { TxHolders.emplace_back(std::move(holders)); } +void TQueryData::ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type, NMiniKQL::TTypeEnvironment& txTypeEnv) { + auto parameterType = GetParameterType(name); + if (!parameterType) { + if (type.GetKind() == NKikimrMiniKQL::ETypeKind::Optional) { + NKikimrMiniKQL::TValue value; + AddMkqlParam(name, type, value); + return; + } + ythrow yexception() << "Missing value for parameter: " << name; + } + + auto pType = ImportTypeFromProto(type, txTypeEnv); + if (pType == nullptr || !parameterType->IsSameType(*pType)) { + ythrow yexception() << "Parameter " << name + << " type mismatch, expected: " << type << ", actual: " << *parameterType; + } +} + +void TQueryData::PrepareParameters(const TKqpPhyTxHolder::TConstPtr& tx, const TPreparedQueryHolder::TConstPtr& preparedQuery, + NMiniKQL::TTypeEnvironment& txTypeEnv) +{ + for (const auto& paramDesc : preparedQuery->GetParameters()) { + ValidateParameter(paramDesc.GetName(), paramDesc.GetType(), txTypeEnv); + } + + for(const auto& paramBinding: tx->GetParamBindings()) { + MaterializeParamValue(true, paramBinding); + } +} + +void TQueryData::CreateKqpValueMap(const TKqpPhyTxHolder::TConstPtr& tx) { + for (const auto& paramBinding : tx->GetParamBindings()) { + MaterializeParamValue(true, paramBinding); + } +} + bool TQueryData::AddUVParam(const TString& name, NKikimr::NMiniKQL::TType* type, const NUdf::TUnboxedValue& value) { auto g = TypeEnv().BindAllocator(); auto [_, success] = UnboxedData.emplace(name, std::make_pair(type, value)); diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h index de2ca09978..4d41a4fedc 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.h +++ b/ydb/core/kqp/query_data/kqp_query_data.h @@ -1,9 +1,11 @@ #pragma once #include <ydb/core/kqp/query_data/kqp_prepared_query.h> +#include <ydb/library/yql/core/yql_data_provider.h> #include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/yql/minikql/mkql_node.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/mkql_proto/mkql_proto.h> #include <ydb/library/mkql_proto/protos/minikql.pb.h> #include <library/cpp/random_provider/random_provider.h> @@ -183,7 +185,7 @@ public: bool AddTypedValueParam(const TString& name, const Ydb::TypedValue& p); bool MaterializeParamValue(bool ensure, const NKqpProto::TKqpPhyParamBinding& paramBinding); - void AddTxResults(TVector<NKikimr::NKqp::TKqpExecuterTxResult>&& results); + void AddTxResults(TVector<TKqpExecuterTxResult>&& results); void AddTxHolders(TVector<TKqpPhyTxHolder::TConstPtr>&& holders); bool HasResult(ui32 txIndex, ui32 resultIndex) { @@ -193,8 +195,13 @@ public: return resultIndex < TxResults[txIndex].size(); } + void ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type, NMiniKQL::TTypeEnvironment& txTypeEnv); + void PrepareParameters(const TKqpPhyTxHolder::TConstPtr& tx, const TPreparedQueryHolder::TConstPtr& preparedQuery, + NMiniKQL::TTypeEnvironment& txTypeEnv); + void CreateKqpValueMap(const TKqpPhyTxHolder::TConstPtr& tx); + TTypedUnboxedValue GetTxResult(ui32 txIndex, ui32 resultIndex); - NKikimrMiniKQL::TResult* GetMkqlTxResult(ui32 txIndex, ui32 resultIndex, google::protobuf::Arena* arena); + NKikimrMiniKQL::TResult* GetMkqlTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena); std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding); TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index a52d197028..e3418d9acc 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -815,48 +815,6 @@ public: } } - void ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type) { - auto& txCtx = QueryState->TxCtx; - YQL_ENSURE(txCtx); - auto parameterType = QueryState->QueryData->GetParameterType(name); - if (!parameterType) { - if (type.GetKind() == NKikimrMiniKQL::ETypeKind::Optional) { - NKikimrMiniKQL::TValue value; - QueryState->QueryData->AddMkqlParam(name, type, value); - return; - } - ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Missing value for parameter: " << name; - } - - auto pType = ImportTypeFromProto(type, txCtx->TxAlloc->TypeEnv); - if (pType == nullptr || !parameterType->IsSameType(*pType)) { - ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Parameter " << name - << " type mismatch, expected: " << type << ", actual: " << *parameterType; - } - } - - TQueryData::TPtr PrepareParameters(const TKqpPhyTxHolder::TConstPtr& tx) { - for (const auto& paramDesc : QueryState->PreparedQuery->GetParameters()) { - ValidateParameter(paramDesc.GetName(), paramDesc.GetType()); - } - - try { - for(const auto& paramBinding: tx->GetParamBindings()) { - QueryState->QueryData->MaterializeParamValue(true, paramBinding); - } - } catch (const yexception& ex) { - ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); - } - return QueryState->QueryData; - } - - TQueryData::TPtr CreateKqpValueMap(const TKqpPhyTxHolder::TConstPtr& tx) { - for (const auto& paramBinding : tx->GetParamBindings()) { - QueryState->QueryData->MaterializeParamValue(true, paramBinding); - } - return QueryState->QueryData; - } - bool CheckTransacionLocks() { auto& txCtx = *QueryState->TxCtx; if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) { @@ -900,7 +858,12 @@ public: auto tx = QueryState->PreparedQuery->GetPhyTxOrEmpty(QueryState->CurrentTx); if (!Config->FeatureFlags.GetEnableKqpImmediateEffects()) { while (tx && tx->GetHasEffects()) { - bool success = txCtx.AddDeferredEffect(tx, CreateKqpValueMap(tx)); + try { + QueryState->QueryData->CreateKqpValueMap(tx); + } catch (const yexception& ex) { + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); + } + bool success = txCtx.AddDeferredEffect(tx, QueryState->QueryData); YQL_ENSURE(success); LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { @@ -984,7 +947,12 @@ public: YQL_ENSURE(false, "Unexpected physical tx type in data query: " << (ui32)tx->GetType()); } - request.Transactions.emplace_back(tx, PrepareParameters(tx)); + try { + QueryState->QueryData->PrepareParameters(tx, QueryState->PreparedQuery, txCtx.TxAlloc->TypeEnv); + } catch (const yexception& ex) { + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); + } + request.Transactions.emplace_back(tx, QueryState->QueryData); txCtx.HasImmediateEffects = txCtx.HasImmediateEffects || tx->GetHasEffects(); } else { YQL_ENSURE(commit); @@ -1164,17 +1132,14 @@ public: YQL_ENSURE(QueryState); LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, ev->ResultRowsCount); - auto& executerResults = *response->MutableResult(); - { - auto g = QueryState->QueryData->TypeEnv().BindAllocator(); - QueryState->QueryData->AddTxResults(std::move(ev->GetTxResults())); - QueryState->QueryData->AddTxHolders(std::move(ev->GetTxHolders())); - } + QueryState->QueryData->AddTxResults(std::move(ev->GetTxResults())); + QueryState->QueryData->AddTxHolders(std::move(ev->GetTxHolders())); if (ev->LockHandle) { QueryState->TxCtx->Locks.LockHandle = std::move(ev->LockHandle); } + auto& executerResults = *response->MutableResult(); if (!MergeLocksWithTxResult(executerResults)) { return; } @@ -1389,18 +1354,12 @@ public: response->SetPreparedQuery(queryId); } - bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat(); // Result for scan query is sent directly to target actor. if (QueryState->PreparedQuery && !QueryState->IsStreamResult()) { + bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat(); auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { - auto& rb = phyQuery.GetResultBindings(i); - auto txIndex = rb.GetTxResultBinding().GetTxIndex(); - auto resultIndex = rb.GetTxResultBinding().GetResultIndex(); - - YQL_ENSURE(QueryState->QueryData->HasResult(txIndex, resultIndex)); - auto g = QueryState->QueryData->TypeEnv().BindAllocator(); - auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, arena.get()); + auto* protoRes = QueryState->QueryData->GetMkqlTxResult(phyQuery.GetResultBindings(i), arena.get()); std::optional<IDataProvider::TFillSettings> fillSettings; if (QueryState->PreparedQuery->ResultsSize()) { YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), "" |