diff options
author | gvit <gvit@ydb.tech> | 2023-11-23 19:53:26 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-11-23 21:01:21 +0300 |
commit | ac691915bc3127118492c2ec79e30ed5942a4544 (patch) | |
tree | dd66e9b17cb7ac99ea3fbdc34527792a22dae8fb | |
parent | dba7ceaa399571511c21d2007ba1ab9f477e6be9 (diff) | |
download | ydb-ac691915bc3127118492c2ec79e30ed5942a4544.tar.gz |
exists/not exists semantic fix: supported only in query service
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp | 31 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_gateway.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_worker_actor.cpp | 23 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 2 |
9 files changed, 57 insertions, 31 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 91d2a8a0c8..5dcc1b60d7 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -176,7 +176,7 @@ private: std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>( TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); - Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Database, std::move(loader), + Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader), ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters); Gateway->SetToken(QueryId.Cluster, UserToken); diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index fa8c05d378..6d4cef595b 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -93,7 +93,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext); -IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, +IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType, const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool temporary, TString SessionId, TIntrusivePtr<TUserRequestContext> ctx); diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 2fdbf74c10..13f4c3ba93 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -49,10 +49,11 @@ public: return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR; } - TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TMaybe<TString>& requestType, + TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType, const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx) : PhyTx(phyTx) + , QueryType(queryType) , Target(target) , Database(database) , UserToken(userToken) @@ -157,8 +158,14 @@ public: auto promise = NewPromise<IKqpGateway::TGenericResult>(); - bool successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist(); - bool failedOnAlreadyExists = ev->Record.GetTransaction().GetModifyScheme().GetFailedOnAlreadyExists(); + bool successOnNotExist = false; + bool failedOnAlreadyExists = false; + // exists/not exists semantics supported only in the query service. + if (IsQueryService()) { + successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist(); + failedOnAlreadyExists = ev->Record.GetTransaction().GetModifyScheme().GetFailedOnAlreadyExists(); + } + IActor* requestHandler = new TSchemeOpRequestHandler( ev.Release(), promise, @@ -403,6 +410,19 @@ private: << ", event: " << eventType); } + bool IsQueryService() const { + + switch(QueryType) { + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: + return true; + default: + return false; + } + + } + void InternalError(const NYql::TIssues& issues) { LOG_E(issues.ToOneLineString()); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, @@ -433,6 +453,7 @@ private: private: TKqpPhyTxHolder::TConstPtr PhyTx; + const NKikimrKqp::EQueryType QueryType; const TActorId Target; const TString Database; const TIntrusiveConstPtr<NACLib::TUserToken> UserToken; @@ -448,12 +469,12 @@ private: } // namespace -IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, +IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType, const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx) { - return new TKqpSchemeExecuter(phyTx, target, requestType, database, userToken, temporary, sessionId, std::move(ctx)); + return new TKqpSchemeExecuter(phyTx, queryType, target, requestType, database, userToken, temporary, sessionId, std::move(ctx)); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 42f43b0240..73b2cc06fd 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -196,7 +196,7 @@ public: std::shared_ptr<NGRpcService::IRequestCtxMtSafe> rpcCtx) = 0; }; -TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const TString& database, +TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index f9d0c0e262..78f3a10dc0 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -431,9 +431,10 @@ class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped<TKqpSchemeExec public: using TResult = IKqpGateway::TGenericResult; - TKqpSchemeExecuterRequestHandler(TKqpPhyTxHolder::TConstPtr phyTx, const TMaybe<TString>& requestType, const TString& database, + TKqpSchemeExecuterRequestHandler(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TMaybe<TString>& requestType, const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken, TPromise<TResult> promise) : PhyTx(std::move(phyTx)) + , QueryType(queryType) , Database(database) , UserToken(std::move(userToken)) , Promise(promise) @@ -442,7 +443,7 @@ public: void Bootstrap() { auto ctx = MakeIntrusive<TUserRequestContext>(); - IActor* actor = CreateKqpSchemeExecuter(PhyTx, SelfId(), RequestType, Database, UserToken, false /* temporary */, TString() /* sessionId */, ctx); + IActor* actor = CreateKqpSchemeExecuter(PhyTx, QueryType, SelfId(), RequestType, Database, UserToken, false /* temporary */, TString() /* sessionId */, ctx); Register(actor); Become(&TThis::WaitState); } @@ -471,6 +472,7 @@ public: private: TKqpPhyTxHolder::TConstPtr PhyTx; + const NKikimrKqp::EQueryType QueryType; const TString Database; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TPromise<TResult> Promise; @@ -594,9 +596,10 @@ private: using TNavigate = NSchemeCache::TSchemeCacheNavigate; public: - TKikimrIcGateway(const TString& cluster, const TString& database, std::shared_ptr<IKqpTableMetadataLoader>&& metadataLoader, + TKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, std::shared_ptr<IKqpTableMetadataLoader>&& metadataLoader, TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters) : Cluster(cluster) + , QueryType(queryType) , Database(database) , ActorSystem(actorSystem) , NodeId(nodeId) @@ -2085,7 +2088,7 @@ private: TFuture<TGenericResult> SendSchemeExecuterRequest(const TString&, const TMaybe<TString>& requestType, const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder>& phyTx) override { auto promise = NewPromise<TGenericResult>(); - IActor* requestHandler = new TKqpSchemeExecuterRequestHandler(phyTx, requestType, Database, UserToken, promise); + IActor* requestHandler = new TKqpSchemeExecuterRequestHandler(phyTx, QueryType, requestType, Database, UserToken, promise); RegisterActor(requestHandler); return promise.GetFuture(); } @@ -2237,6 +2240,7 @@ private: private: TString Cluster; + const NKikimrKqp::EQueryType QueryType; TString Database; TActorSystem* ActorSystem; ui32 NodeId; @@ -2248,11 +2252,11 @@ private: } // namespace -TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const TString& database, +TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader>&& metadataLoader, TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters) { - return MakeIntrusive<TKikimrIcGateway>(cluster, database, std::move(metadataLoader), actorSystem, nodeId, + return MakeIntrusive<TKikimrIcGateway>(cluster, queryType, database, std::move(metadataLoader), actorSystem, nodeId, counters); } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index 87b60e866d..f88601de17 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -74,7 +74,7 @@ TIntrusivePtr<IKqpGateway> GetIcGateway(Tests::TServer& server) { counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters); std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false); - return CreateKikimrIcGateway(TestCluster, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), + return CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), server.GetRuntime()->GetNodeId(0), counters); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c87164a2c6..94620b6427 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1045,11 +1045,13 @@ public: } void SendToSchemeExecuter(const TKqpPhyTxHolder::TConstPtr& tx) { - auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(); - const TString requestType = QueryState ? QueryState->GetRequestType() : TString(); + YQL_ENSURE(QueryState); + + auto userToken = QueryState->UserToken; + const TString requestType = QueryState->GetRequestType(); bool temporary = GetTemporaryTableInfo(tx).has_value(); - auto executerActor = CreateKqpSchemeExecuter(tx, SelfId(), requestType, Settings.Database, userToken, + auto executerActor = CreateKqpSchemeExecuter(tx, QueryState->GetType(), SelfId(), requestType, Settings.Database, userToken, temporary, *TempTablesState.SessionId, QueryState->UserRequestContext); ExecuterId = RegisterWithSameMailbox(executerActor); diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index e1c274e5b0..4388a60913 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -131,20 +131,9 @@ public: RequestCounters->TxProxyMon = MakeIntrusive<NTxProxy::TTxProxyMon>(AppData()->Counters); } - void Bootstrap(const TActorContext& ctx) { + void Bootstrap(const TActorContext&) { LOG_D("Worker bootstrapped"); Counters->ReportWorkerCreated(Settings.DbCounters); - - std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>( - TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); - Gateway = CreateKikimrIcGateway(Settings.Cluster, Settings.Database, std::move(loader), - ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters); - - Config->FeatureFlags = AppData(ctx)->FeatureFlags; - - KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, - FederatedQuerySetup, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false); - Become(&TKqpWorkerActor::ReadyState); } @@ -189,6 +178,16 @@ public: QueryState->Sender = ev->Sender; QueryState->RequestEv.reset(ev->Release().Release()); + std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>( + TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); + Gateway = CreateKikimrIcGateway(Settings.Cluster, QueryState->RequestEv->GetType(), Settings.Database, std::move(loader), + ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters); + + Config->FeatureFlags = AppData(ctx)->FeatureFlags; + + KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, + FederatedQuerySetup, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false); + auto& queryRequest = QueryState->RequestEv; QueryState->ProxyRequestId = proxyRequestId; QueryState->KeepSession = Settings.LongSession || queryRequest->GetKeepSession(); diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 64ff8b71f4..df7549c58a 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -35,7 +35,7 @@ TIntrusivePtr<NKqp::IKqpGateway> GetIcGateway(Tests::TServer& server) { counters->Counters = new TKqpCounters(server.GetRuntime()->GetAppData(0).Counters); counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters); std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr),false); - return NKqp::CreateKikimrIcGateway(TestCluster, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), + return NKqp::CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), server.GetRuntime()->GetNodeId(0), counters); } |