aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2023-03-17 15:10:42 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2023-03-17 15:10:42 +0300
commitccc1f1bc46a5282af7e0372de86bc26b71adc462 (patch)
treebc1efe36b7aade30008c0b67b6cc567e9e9d1d65
parent86cdce38e8520ed2e289acb04dbdb6230c0e1dbf (diff)
downloadydb-ccc1f1bc46a5282af7e0372de86bc26b71adc462.tar.gz
Refactor parameters code in SessionActor
-rw-r--r--ydb/core/kqp/common/kqp.h2
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp2
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.cpp49
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.h11
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp75
5 files changed, 74 insertions, 65 deletions
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 1f03ca80f1..c6d5c0dba0 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -725,7 +725,7 @@ struct TEvKqp {
NLWTrace::TOrbit Orbit;
};
- struct TEvKqpProxyPublishRequest :
+ struct TEvKqpProxyPublishRequest :
public TEventLocal<TEvKqpProxyPublishRequest, TKqpEvents::EvKqpProxyPublishRequest> {};
struct TEvCompileInvalidateRequest : public TEventLocal<TEvCompileInvalidateRequest,
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 96b8c61549..576f5a2c43 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -511,7 +511,7 @@ private:
auto& txResults = ev->GetTxResults();
result.Results.reserve(txResults.size());
for(auto& tx : txResults) {
- result.Results.emplace_back(std::move(tx.GetMkql()));
+ result.Results.emplace_back(tx.GetMkql());
}
Parameters->AddTxHolders(std::move(ev->GetTxHolders()));
Parameters->AddTxResults(std::move(txResults));
diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp
index d16c3eabc8..1bee2405dd 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.cpp
+++ b/ydb/core/kqp/query_data/kqp_query_data.cpp
@@ -1,11 +1,12 @@
#include "kqp_query_data.h"
#include <ydb/core/protos/kqp_physical.pb.h>
+
#include <ydb/library/mkql_proto/mkql_proto.h>
-#include <ydb/library/yql/minikql/mkql_string_util.h>
-#include <ydb/library/yql/utils/yql_panic.h>
#include <ydb/library/yql/dq/runtime/dq_transport.h>
+#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>
+#include <ydb/library/yql/utils/yql_panic.h>
namespace NKikimr::NKqp {
@@ -160,11 +161,17 @@ std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> TQueryData::GetTxResul
TypeEnv(), AllocState->HolderFactory);
}
-NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(ui32 txIndex, ui32 resultIndex, google::protobuf::Arena* arena) {
+NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena) {
+ auto txIndex = rb.GetTxResultBinding().GetTxIndex();
+ auto resultIndex = rb.GetTxResultBinding().GetResultIndex();
+
+ YQL_ENSURE(HasResult(txIndex, resultIndex));
+ auto g = TypeEnv().BindAllocator();
return TxResults[txIndex][resultIndex].GetMkql(arena);
}
void TQueryData::AddTxResults(TVector<TKqpExecuterTxResult>&& results) {
+ auto g = TypeEnv().BindAllocator();
TxResults.emplace_back(std::move(results));
}
@@ -172,6 +179,42 @@ void TQueryData::AddTxHolders(TVector<TKqpPhyTxHolder::TConstPtr>&& holders) {
TxHolders.emplace_back(std::move(holders));
}
+void TQueryData::ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type, NMiniKQL::TTypeEnvironment& txTypeEnv) {
+ auto parameterType = GetParameterType(name);
+ if (!parameterType) {
+ if (type.GetKind() == NKikimrMiniKQL::ETypeKind::Optional) {
+ NKikimrMiniKQL::TValue value;
+ AddMkqlParam(name, type, value);
+ return;
+ }
+ ythrow yexception() << "Missing value for parameter: " << name;
+ }
+
+ auto pType = ImportTypeFromProto(type, txTypeEnv);
+ if (pType == nullptr || !parameterType->IsSameType(*pType)) {
+ ythrow yexception() << "Parameter " << name
+ << " type mismatch, expected: " << type << ", actual: " << *parameterType;
+ }
+}
+
+void TQueryData::PrepareParameters(const TKqpPhyTxHolder::TConstPtr& tx, const TPreparedQueryHolder::TConstPtr& preparedQuery,
+ NMiniKQL::TTypeEnvironment& txTypeEnv)
+{
+ for (const auto& paramDesc : preparedQuery->GetParameters()) {
+ ValidateParameter(paramDesc.GetName(), paramDesc.GetType(), txTypeEnv);
+ }
+
+ for(const auto& paramBinding: tx->GetParamBindings()) {
+ MaterializeParamValue(true, paramBinding);
+ }
+}
+
+void TQueryData::CreateKqpValueMap(const TKqpPhyTxHolder::TConstPtr& tx) {
+ for (const auto& paramBinding : tx->GetParamBindings()) {
+ MaterializeParamValue(true, paramBinding);
+ }
+}
+
bool TQueryData::AddUVParam(const TString& name, NKikimr::NMiniKQL::TType* type, const NUdf::TUnboxedValue& value) {
auto g = TypeEnv().BindAllocator();
auto [_, success] = UnboxedData.emplace(name, std::make_pair(type, value));
diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h
index de2ca09978..4d41a4fedc 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.h
+++ b/ydb/core/kqp/query_data/kqp_query_data.h
@@ -1,9 +1,11 @@
#pragma once
#include <ydb/core/kqp/query_data/kqp_prepared_query.h>
+#include <ydb/library/yql/core/yql_data_provider.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/mkql_proto/mkql_proto.h>
#include <ydb/library/mkql_proto/protos/minikql.pb.h>
#include <library/cpp/random_provider/random_provider.h>
@@ -183,7 +185,7 @@ public:
bool AddTypedValueParam(const TString& name, const Ydb::TypedValue& p);
bool MaterializeParamValue(bool ensure, const NKqpProto::TKqpPhyParamBinding& paramBinding);
- void AddTxResults(TVector<NKikimr::NKqp::TKqpExecuterTxResult>&& results);
+ void AddTxResults(TVector<TKqpExecuterTxResult>&& results);
void AddTxHolders(TVector<TKqpPhyTxHolder::TConstPtr>&& holders);
bool HasResult(ui32 txIndex, ui32 resultIndex) {
@@ -193,8 +195,13 @@ public:
return resultIndex < TxResults[txIndex].size();
}
+ void ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type, NMiniKQL::TTypeEnvironment& txTypeEnv);
+ void PrepareParameters(const TKqpPhyTxHolder::TConstPtr& tx, const TPreparedQueryHolder::TConstPtr& preparedQuery,
+ NMiniKQL::TTypeEnvironment& txTypeEnv);
+ void CreateKqpValueMap(const TKqpPhyTxHolder::TConstPtr& tx);
+
TTypedUnboxedValue GetTxResult(ui32 txIndex, ui32 resultIndex);
- NKikimrMiniKQL::TResult* GetMkqlTxResult(ui32 txIndex, ui32 resultIndex, google::protobuf::Arena* arena);
+ NKikimrMiniKQL::TResult* GetMkqlTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena);
std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding);
TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name);
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index a52d197028..e3418d9acc 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -815,48 +815,6 @@ public:
}
}
- void ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type) {
- auto& txCtx = QueryState->TxCtx;
- YQL_ENSURE(txCtx);
- auto parameterType = QueryState->QueryData->GetParameterType(name);
- if (!parameterType) {
- if (type.GetKind() == NKikimrMiniKQL::ETypeKind::Optional) {
- NKikimrMiniKQL::TValue value;
- QueryState->QueryData->AddMkqlParam(name, type, value);
- return;
- }
- ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Missing value for parameter: " << name;
- }
-
- auto pType = ImportTypeFromProto(type, txCtx->TxAlloc->TypeEnv);
- if (pType == nullptr || !parameterType->IsSameType(*pType)) {
- ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "Parameter " << name
- << " type mismatch, expected: " << type << ", actual: " << *parameterType;
- }
- }
-
- TQueryData::TPtr PrepareParameters(const TKqpPhyTxHolder::TConstPtr& tx) {
- for (const auto& paramDesc : QueryState->PreparedQuery->GetParameters()) {
- ValidateParameter(paramDesc.GetName(), paramDesc.GetType());
- }
-
- try {
- for(const auto& paramBinding: tx->GetParamBindings()) {
- QueryState->QueryData->MaterializeParamValue(true, paramBinding);
- }
- } catch (const yexception& ex) {
- ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
- }
- return QueryState->QueryData;
- }
-
- TQueryData::TPtr CreateKqpValueMap(const TKqpPhyTxHolder::TConstPtr& tx) {
- for (const auto& paramBinding : tx->GetParamBindings()) {
- QueryState->QueryData->MaterializeParamValue(true, paramBinding);
- }
- return QueryState->QueryData;
- }
-
bool CheckTransacionLocks() {
auto& txCtx = *QueryState->TxCtx;
if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) {
@@ -900,7 +858,12 @@ public:
auto tx = QueryState->PreparedQuery->GetPhyTxOrEmpty(QueryState->CurrentTx);
if (!Config->FeatureFlags.GetEnableKqpImmediateEffects()) {
while (tx && tx->GetHasEffects()) {
- bool success = txCtx.AddDeferredEffect(tx, CreateKqpValueMap(tx));
+ try {
+ QueryState->QueryData->CreateKqpValueMap(tx);
+ } catch (const yexception& ex) {
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
+ }
+ bool success = txCtx.AddDeferredEffect(tx, QueryState->QueryData);
YQL_ENSURE(success);
LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
@@ -984,7 +947,12 @@ public:
YQL_ENSURE(false, "Unexpected physical tx type in data query: " << (ui32)tx->GetType());
}
- request.Transactions.emplace_back(tx, PrepareParameters(tx));
+ try {
+ QueryState->QueryData->PrepareParameters(tx, QueryState->PreparedQuery, txCtx.TxAlloc->TypeEnv);
+ } catch (const yexception& ex) {
+ ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
+ }
+ request.Transactions.emplace_back(tx, QueryState->QueryData);
txCtx.HasImmediateEffects = txCtx.HasImmediateEffects || tx->GetHasEffects();
} else {
YQL_ENSURE(commit);
@@ -1164,17 +1132,14 @@ public:
YQL_ENSURE(QueryState);
LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, ev->ResultRowsCount);
- auto& executerResults = *response->MutableResult();
- {
- auto g = QueryState->QueryData->TypeEnv().BindAllocator();
- QueryState->QueryData->AddTxResults(std::move(ev->GetTxResults()));
- QueryState->QueryData->AddTxHolders(std::move(ev->GetTxHolders()));
- }
+ QueryState->QueryData->AddTxResults(std::move(ev->GetTxResults()));
+ QueryState->QueryData->AddTxHolders(std::move(ev->GetTxHolders()));
if (ev->LockHandle) {
QueryState->TxCtx->Locks.LockHandle = std::move(ev->LockHandle);
}
+ auto& executerResults = *response->MutableResult();
if (!MergeLocksWithTxResult(executerResults)) {
return;
}
@@ -1389,18 +1354,12 @@ public:
response->SetPreparedQuery(queryId);
}
- bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
// Result for scan query is sent directly to target actor.
if (QueryState->PreparedQuery && !QueryState->IsStreamResult()) {
+ bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
- auto& rb = phyQuery.GetResultBindings(i);
- auto txIndex = rb.GetTxResultBinding().GetTxIndex();
- auto resultIndex = rb.GetTxResultBinding().GetResultIndex();
-
- YQL_ENSURE(QueryState->QueryData->HasResult(txIndex, resultIndex));
- auto g = QueryState->QueryData->TypeEnv().BindAllocator();
- auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, arena.get());
+ auto* protoRes = QueryState->QueryData->GetMkqlTxResult(phyQuery.GetResultBindings(i), arena.get());
std::optional<IDataProvider::TFillSettings> fillSettings;
if (QueryState->PreparedQuery->ResultsSize()) {
YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), ""