diff options
author | gvit <gvit@ydb.tech> | 2023-10-26 23:12:43 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-10-26 23:32:48 +0300 |
commit | 3be69bf3deb6f06121d3f4316f88c2a9532675d6 (patch) | |
tree | 90d239a32fcad45962465d7467e7b6f70a145430 | |
parent | 9fc3c43b3476e61fe0df7d4c9906e44768bc7fa8 (diff) | |
download | ydb-3be69bf3deb6f06121d3f4316f88c2a9532675d6.tar.gz |
refactor alter table: use scheme executer
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 7 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp | 37 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/actors/scheme.h | 21 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 74 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 83 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.h | 9 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_prepared_query.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 24 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table_ut.cpp | 8 |
13 files changed, 223 insertions, 64 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index c8bd2dfc9a..2a4fcd4fff 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -94,9 +94,10 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext); -IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database, - TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc, - bool temporary, TString SessionId); +IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, + const TMaybe<TString>& requestType, const TString& database, + TIntrusiveConstPtr<NACLib::TUserToken> userToken, + bool temporary, TString SessionId, TIntrusivePtr<TUserRequestContext> ctx); std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral( IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner, const TIntrusivePtr<TUserRequestContext>& userRequestContext); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index ae38be0628..5271170a0a 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -35,6 +35,7 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPt void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch&& rows) { YQL_ENSURE(idx < TxResults.size()); + YQL_ENSURE(AllocState); ResultRowsCount += rows.RowCount(); ResultRowsBytes += rows.Size(); auto guard = AllocState->TypeEnv.BindAllocator(); @@ -48,7 +49,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch } TEvKqpExecuter::TEvTxResponse::~TEvTxResponse() { - if (!TxResults.empty()) { + if (!TxResults.empty() && Y_LIKELY(AllocState)) { with_lock(AllocState->Alloc) { TxResults.crop(0); } diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index b40abaf3a5..60757028b8 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -1,4 +1,5 @@ #include "kqp_executer.h" +#include "kqp_executer_impl.h" #include <ydb/core/kqp/gateway/actors/scheme.h> #include <ydb/core/kqp/gateway/local_rpc/helper.h> @@ -8,15 +9,10 @@ namespace NKikimr::NKqp { -#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << ". " << stream) -#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << ". " << stream) -#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << ". " << stream) - using namespace NThreading; namespace { - static bool CheckAlterAccess(const NACLib::TUserToken& userToken, const NSchemeCache::TSchemeCacheNavigate* navigate) { bool isDatabase = true; // first entry is always database @@ -53,20 +49,22 @@ public: return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR; } - TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database, - TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc, - bool temporary, TString sessionId) + TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TMaybe<TString>& requestType, + const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken, + bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx) : PhyTx(phyTx) , Target(target) , Database(database) , UserToken(userToken) , Temporary(temporary) , SessionId(sessionId) + , RequestContext(std::move(ctx)) + , RequestType(requestType) { YQL_ENSURE(PhyTx); YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME); - ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(txAlloc); + ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(nullptr); } void StartBuildOperation() { @@ -85,6 +83,10 @@ public: ev->Record.SetUserToken(UserToken->GetSerializedToken()); } + if (RequestType) { + ev->Record.SetRequestType(*RequestType); + } + const auto& schemeOp = PhyTx->GetSchemeOperation(); switch (schemeOp.GetOperationCase()) { case NKqpProto::TKqpSchemeOperation::kCreateTable: { @@ -123,7 +125,7 @@ public: case NKqpProto::TKqpSchemeOperation::kAlterTable: { auto modifyScheme = schemeOp.GetAlterTable(); ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); - return; + break; } case NKqpProto::TKqpSchemeOperation::kBuildOperation: { @@ -269,6 +271,10 @@ public: IActor::PassAway(); } + const TIntrusivePtr<TUserRequestContext>& GetUserRequestContext() const { + return RequestContext; + } + void Handle(NSchemeShard::TEvIndexBuilder::TEvCreateResponse::TPtr& ev) { const auto& response = ev->Get()->Record; const auto status = response.GetStatus(); @@ -376,6 +382,7 @@ private: void UnexpectedEvent(const TString& state, ui32 eventType) { LOG_C("TKqpSchemeExecuter, unexpected event: " << eventType << ", at state:" << state << ", selfID: " << SelfId()); + InternalError(TStringBuilder() << "Unexpected event at TKqpSchemeExecuter, state: " << state << ", event: " << eventType); } @@ -419,14 +426,18 @@ private: ui64 TxId = 0; TActorId SchemePipeActorId_; ui64 SchemeShardTabletId = 0; + TIntrusivePtr<TUserRequestContext> RequestContext; + const TMaybe<TString> RequestType; }; } // namespace -IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database, - TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc, bool temporary, TString sessionId) +IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, + const TMaybe<TString>& requestType, const TString& database, + TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool temporary, TString sessionId, + TIntrusivePtr<TUserRequestContext> ctx) { - return new TKqpSchemeExecuter(phyTx, target, database, userToken, txAlloc, temporary, sessionId); + return new TKqpSchemeExecuter(phyTx, target, requestType, database, userToken, temporary, sessionId, std::move(ctx)); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/gateway/actors/scheme.h b/ydb/core/kqp/gateway/actors/scheme.h index 1ea2d8ccdd..d4473472d9 100644 --- a/ydb/core/kqp/gateway/actors/scheme.h +++ b/ydb/core/kqp/gateway/actors/scheme.h @@ -135,6 +135,27 @@ public: return; } + case NKikimrScheme::EStatus::StatusSchemeError: { + Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_SCHEME_ERROR, + response.GetSchemeShardReason(), {})); + this->Die(ctx); + return; + } + + case NKikimrScheme::EStatus::StatusPreconditionFailed: { + Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, + response.GetSchemeShardReason(), {})); + this->Die(ctx); + return; + } + + case NKikimrScheme::EStatus::StatusInvalidParameter: { + Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_BAD_REQUEST, + response.GetSchemeShardReason(), {})); + this->Die(ctx); + return; + } + default: break; } diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 53e173e775..31e7e7c5fd 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -467,6 +467,55 @@ private: TVector<NYql::NDqProto::TDqExecutionStats> Executions; }; +class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped<TKqpSchemeExecuterRequestHandler> { +public: + using TResult = IKqpGateway::TGenericResult; + + TKqpSchemeExecuterRequestHandler(TKqpPhyTxHolder::TConstPtr phyTx, const TMaybe<TString>& requestType, const TString& database, + TIntrusiveConstPtr<NACLib::TUserToken> userToken, TPromise<TResult> promise) + : PhyTx(std::move(phyTx)) + , Database(database) + , UserToken(std::move(userToken)) + , Promise(promise) + , RequestType(requestType) + {} + + void Bootstrap() { + auto ctx = MakeIntrusive<TUserRequestContext>(); + IActor* actor = CreateKqpSchemeExecuter(PhyTx, SelfId(), RequestType, Database, UserToken, false /* temporary */, TString() /* sessionId */, ctx); + Register(actor); + Become(&TThis::WaitState); + } + + STATEFN(WaitState) { + switch(ev->GetTypeRewrite()) { + hFunc(TEvKqpExecuter::TEvTxResponse, Handle); + } + } + + void Handle(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { + auto* response = ev->Get()->Record.MutableResponse(); + + TResult result; + if (response->GetStatus() == Ydb::StatusIds::SUCCESS) { + result.SetSuccess(); + } else { + for (auto& issue : response->GetIssues()) { + result.AddIssue(NYql::IssueFromMessage(issue)); + } + } + + Promise.SetValue(result); + this->PassAway(); + } + +private: + TKqpPhyTxHolder::TConstPtr PhyTx; + const TString Database; + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + TPromise<TResult> Promise; + const TMaybe<TString> RequestType; +}; class TKqpExecLiteralRequestHandler: public TActorBootstrapped<TKqpExecLiteralRequestHandler> { public: @@ -798,23 +847,11 @@ public: } } - TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, - const TMaybe<TString>& requestType, ui64 flags) override + TFuture<TGenericResult> AlterTable(const TString&, Ydb::Table::AlterTableRequest&&, const TMaybe<TString>&, ui64) override { try { - YQL_ENSURE(!flags); //Supported only for prepared mode - if (!CheckCluster(cluster)) { - return InvalidCluster<TGenericResult>(cluster); - } - - // FIXME: should be defined in grpc_services/rpc_calls.h, but cause cyclic dependency - using namespace NGRpcService; - using TEvAlterTableRequest = TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest, - Ydb::Table::AlterTableResponse>; - - return SendLocalRpcRequestNoResult<TEvAlterTableRequest>(std::move(req), Database, GetTokenCompat(), requestType); - } - catch (yexception& e) { + YQL_ENSURE(false, "gateway doesn't implement alter"); + } catch (yexception& e) { return MakeFuture(ResultFromException<TGenericResult>(e)); } } @@ -2075,6 +2112,13 @@ private: return promise.GetFuture(); } + TFuture<TGenericResult> SendSchemeExecuterRequest(const TString&, const TMaybe<TString>& requestType, const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder>& phyTx) override { + auto promise = NewPromise<TGenericResult>(); + IActor* requestHandler = new TKqpSchemeExecuterRequestHandler(phyTx, requestType, Database, UserToken, promise); + RegisterActor(requestHandler); + return promise.GetFuture(); + } + template<typename TRpc> TFuture<TGenericResult> SendLocalRpcRequestNoResult(typename TRpc::TRequest&& proto, const TString& databse, const TString& token, const TMaybe<TString>& requestType = {}) { return NRpcService::DoLocalRpc<TRpc>(std::move(proto), databse, token, requestType, ActorSystem).Apply([](NThreading::TFuture<typename TRpc::TResponse> future) { diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 568715885a..ca56c8aac0 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -544,18 +544,10 @@ public: return tablePromise.GetFuture(); } - TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, - const TMaybe<TString>& requestType, ui64 flags) override + TFuture<TGenericResult> PrepareAlterTable(const TString&, Ydb::Table::AlterTableRequest&& req, + const TMaybe<TString>&, ui64 flags) { - CHECK_PREPARED_DDL(AlterTable); - - if (!IsPrepare()) { - return Gateway->AlterTable(cluster, std::move(req), requestType, flags); - } - auto &phyQuery = - *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); - auto &phyTx = *phyQuery.AddTransactions(); - phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + YQL_ENSURE(SessionCtx->Query().PreparingQuery); auto promise = NewPromise<TGenericResult>(); const auto ops = GetAlterOperationKinds(&req); if (ops.size() != 1) { @@ -571,6 +563,10 @@ public: const auto opType = *ops.begin(); auto tablePromise = NewPromise<TGenericResult>(); if (opType == EAlterOperationKind::AddIndex) { + auto &phyQuery = + *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto &phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); auto buildOp = phyTx.MutableSchemeOperation()->MutableBuildOperation(); Ydb::StatusIds::StatusCode code; TString error; @@ -602,6 +598,8 @@ public: auto &phyQuery = *sessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); auto &phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + auto alter = phyTx.MutableSchemeOperation()->MutableAlterTable(); const TPathId invalidPathId; Ydb::StatusIds::StatusCode code; @@ -621,6 +619,69 @@ public: return tablePromise.GetFuture(); } + TFuture<TGenericResult> SendSchemeExecuterRequest(const TString& cluster, const TMaybe<TString>& requestType, + const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder> &phyTx) override + { + return Gateway->SendSchemeExecuterRequest(cluster, requestType, phyTx); + } + + TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, + const TMaybe<TString>& requestType, ui64 flags) override + { + CHECK_PREPARED_DDL(AlterTable); + + auto tablePromise = NewPromise<TGenericResult>(); + + if (!IsPrepare()) { + SessionCtx->Query().PrepareOnly = false; + if (SessionCtx->Query().PreparingQuery) { + auto code = Ydb::StatusIds::BAD_REQUEST; + auto error = TStringBuilder() << "multiple transactions are not supported for alter table operation."; + IKqpGateway::TGenericResult errResult; + errResult.AddIssue(NYql::TIssue(error)); + errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code)); + tablePromise.SetValue(errResult); + return tablePromise.GetFuture(); + } + + SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>(); + } + + auto prepareFuture = PrepareAlterTable(cluster, std::move(req), requestType, flags); + if (IsPrepare()) + return prepareFuture; + + auto sessionCtx = SessionCtx; + auto gateway = Gateway; + prepareFuture.Subscribe([cluster, requestType, tablePromise, sessionCtx, gateway](const TFuture<IKqpGateway::TGenericResult> &future) mutable { + auto result = future.GetValue(); + TPreparedQueryHolder::TConstPtr preparedQuery = std::make_shared<TPreparedQueryHolder>(sessionCtx->Query().PreparingQuery.release(), nullptr); + if (result.Success()) { + auto executeFuture = gateway->SendSchemeExecuterRequest(cluster, requestType, preparedQuery->GetPhyTx(0)); + executeFuture.Subscribe([tablePromise](const TFuture<IKqpGateway::TGenericResult> &future) mutable { + auto fresult = future.GetValue(); + if (fresult.Success()) { + TGenericResult result; + result.SetSuccess(); + tablePromise.SetValue(result); + } else { + tablePromise.SetValue( + ResultFromIssues<TGenericResult>(fresult.Status(), fresult.Issues()) + ); + } + }); + return; + } else { + tablePromise.SetValue(ResultFromIssues<TGenericResult>( + result.Status(), result.Issues())); + + return; + } + }); + + return tablePromise.GetFuture(); + } + TFuture<TGenericResult> RenameTable(const TString& src, const TString& dst, const TString& cluster) override { FORWARD_ENSURE_NO_PREPARE(RenameTable, src, dst, cluster); } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index e6707c99f4..5a5d9a46b8 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -16,6 +16,7 @@ #include <ydb/services/metadata/manager/abstract.h> #include <ydb/core/kqp/query_data/kqp_query_data.h> +#include <ydb/core/kqp/query_data/kqp_prepared_query.h> #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/protos/kqp.pb.h> #include <ydb/core/scheme/scheme_types_proto.h> @@ -28,6 +29,10 @@ namespace NKikimr { namespace NMiniKQL { class IFunctionRegistry; } + + namespace NKqp { + class TKqpPhyTxHolder; + } } namespace NYql { @@ -767,6 +772,10 @@ public: virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0; + virtual NThreading::TFuture<TGenericResult> SendSchemeExecuterRequest(const TString& cluster, + const TMaybe<TString>& requestType, + const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder> &phyTx) = 0; + virtual NThreading::TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req, const TMaybe<TString>& requestType, ui64 flags) = 0; diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp index 4a5a10acd6..0be3943a0e 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp +++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp @@ -74,6 +74,7 @@ TKqpPhyTxHolder::TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPrepar const auto& txResult = Proto->GetResults(i); auto& result = TxResultsMeta[i]; + YQL_ENSURE(Alloc); result.MkqlItemType = ImportTypeFromProto(txResult.GetItemType(), Alloc->TypeEnv); //Hack to prevent data race. Side effect of IsPresortSupported - fill cached value. //So no more concurent write subsequently @@ -107,9 +108,14 @@ const NKikimr::NKqp::TStagePredictor& TKqpPhyTxHolder::GetCalculationPredictor(c TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) : Proto(proto) - , Alloc(std::move(std::make_shared<TPreparedQueryAllocHolder>(functionRegistry))) + , Alloc(nullptr) , TableConstInfoById(MakeIntrusive<TTableConstInfoMap>()) { + + if (functionRegistry) { + Alloc = std::make_shared<TPreparedQueryAllocHolder>(functionRegistry); + } + THashSet<TString> tablesSet; const auto& phyQuery = Proto->GetPhysicalQuery(); Transactions.reserve(phyQuery.TransactionsSize()); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 2b7b4f3d82..26e84891f4 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -139,6 +139,10 @@ public: return RequestEv->GetSyntax(); } + const TString& GetRequestType() const { + return RequestEv->GetRequestType(); + } + std::shared_ptr<std::map<TString, Ydb::Type>> GetQueryParameterTypes() const { return QueryParameterTypes; } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 2f6dcf2f4a..c5d3197cb4 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1040,10 +1040,11 @@ public: void SendToSchemeExecuter(const TKqpPhyTxHolder::TConstPtr& tx) { auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(); + const TString requestType = QueryState ? QueryState->GetRequestType() : TString(); bool temporary = GetTemporaryTableInfo(tx).has_value(); - auto executerActor = CreateKqpSchemeExecuter(tx, SelfId(), Settings.Database, userToken, - QueryState->TxCtx->TxAlloc, temporary, *TempTablesState.SessionId); + auto executerActor = CreateKqpSchemeExecuter(tx, SelfId(), requestType, Settings.Database, userToken, + temporary, *TempTablesState.SessionId, QueryState->UserRequestContext); ExecuterId = RegisterWithSameMailbox(executerActor); } diff --git a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp index be18255b04..e58d78fc33 100644 --- a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp @@ -24,7 +24,7 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) { )"); auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); } { @@ -1031,7 +1031,7 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) { )"); auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); } { diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 9cc82eeb8f..cc7dcfd781 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -1151,7 +1151,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { )"; const auto result = session.ExecuteSchemeQuery(query << ";").GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); } { @@ -1459,13 +1459,13 @@ Y_UNIT_TEST_SUITE(KqpScheme) { createTable(R"(Interval("-P1D") ON Ts)", EStatus::GENERIC_ERROR, "Interval value cannot be negative"); - createTable(R"(Interval("P1D") ON CreatedAt)", EStatus::GENERIC_ERROR, "Cannot enable TTL on unknown column"); + createTable(R"(Interval("P1D") ON CreatedAt)", EStatus::SCHEME_ERROR, "Cannot enable TTL on unknown column"); - createTable(R"(Interval("P1D") ON StringValue)", EStatus::GENERIC_ERROR, "Unsupported column type"); + createTable(R"(Interval("P1D") ON StringValue)", EStatus::SCHEME_ERROR, "Unsupported column type"); - createTable(R"(Interval("P1D") ON Uint32Value)", EStatus::GENERIC_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified"); - createTable(R"(Interval("P1D") ON Uint64Value)", EStatus::GENERIC_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified"); - createTable(R"(Interval("P1D") ON DyNumberValue)", EStatus::GENERIC_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified"); + createTable(R"(Interval("P1D") ON Uint32Value)", EStatus::SCHEME_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified"); + createTable(R"(Interval("P1D") ON Uint64Value)", EStatus::SCHEME_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified"); + createTable(R"(Interval("P1D") ON DyNumberValue)", EStatus::SCHEME_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified"); createTable(R"(Interval("P1D") ON Ts)"); { @@ -2409,7 +2409,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { )"; auto session = db.CreateSession().GetValueSync().GetSession(); auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); } } @@ -3568,7 +3568,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { )"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); } } @@ -4217,7 +4217,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { STORE = COLUMN );)"; result = session.ExecuteSchemeQuery(query3).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); } Y_UNIT_TEST(CreateAlterDropColumnTableInStore) { @@ -4358,7 +4358,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10 );)"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); } Y_UNIT_TEST(CreateDropColumnTableNegative) { @@ -4399,7 +4399,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1 );)"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); } { // disallow nullable key @@ -4416,7 +4416,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1 );)"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); } } diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp index 97d2b03f33..3659095f71 100644 --- a/ydb/services/ydb/ydb_table_ut.cpp +++ b/ydb/services/ydb/ydb_table_ut.cpp @@ -149,7 +149,7 @@ Y_UNIT_TEST_SUITE(YdbYqlClient) { ); )___").ExtractValueSync(); UNIT_ASSERT_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR); } { @@ -161,7 +161,7 @@ Y_UNIT_TEST_SUITE(YdbYqlClient) { ); )___").ExtractValueSync(); UNIT_ASSERT_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR); } } @@ -1147,11 +1147,11 @@ R"___(<main>: Error: Transaction not found: , code: 2015 PRIMARY KEY (Key) ); )___").ExtractValueSync(); - UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR); auto ref = R"___(<main>: Error: Execution, code: 1060 <main>:5:30: Error: Executing CREATE TABLE - <main>: Error: Scheme operation failed, status: ExecError, reason: Column Key has wrong key type Double + <main>: Error: Column Key has wrong key type Double, code: 2003 )___"; UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), ref); } |