aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-11-23 19:53:26 +0300
committergvit <gvit@ydb.tech>2023-11-23 21:01:21 +0300
commitac691915bc3127118492c2ec79e30ed5942a4544 (patch)
treedd66e9b17cb7ac99ea3fbdc34527792a22dae8fb
parentdba7ceaa399571511c21d2007ba1ab9f477e6be9 (diff)
downloadydb-ac691915bc3127118492c2ec79e30ed5942a4544.tar.gz
exists/not exists semantic fix: supported only in query service
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp31
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h2
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp16
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp8
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp23
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp2
9 files changed, 57 insertions, 31 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 91d2a8a0c8..5dcc1b60d7 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -176,7 +176,7 @@ private:
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(
TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
- Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Database, std::move(loader),
+ Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters);
Gateway->SetToken(QueryId.Cluster, UserToken);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index fa8c05d378..6d4cef595b 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -93,7 +93,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
-IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target,
+IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
const TMaybe<TString>& requestType, const TString& database,
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
bool temporary, TString SessionId, TIntrusivePtr<TUserRequestContext> ctx);
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
index 2fdbf74c10..13f4c3ba93 100644
--- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
@@ -49,10 +49,11 @@ public:
return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR;
}
- TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TMaybe<TString>& requestType,
+ TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken,
bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx)
: PhyTx(phyTx)
+ , QueryType(queryType)
, Target(target)
, Database(database)
, UserToken(userToken)
@@ -157,8 +158,14 @@ public:
auto promise = NewPromise<IKqpGateway::TGenericResult>();
- bool successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist();
- bool failedOnAlreadyExists = ev->Record.GetTransaction().GetModifyScheme().GetFailedOnAlreadyExists();
+ bool successOnNotExist = false;
+ bool failedOnAlreadyExists = false;
+ // exists/not exists semantics supported only in the query service.
+ if (IsQueryService()) {
+ successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist();
+ failedOnAlreadyExists = ev->Record.GetTransaction().GetModifyScheme().GetFailedOnAlreadyExists();
+ }
+
IActor* requestHandler = new TSchemeOpRequestHandler(
ev.Release(),
promise,
@@ -403,6 +410,19 @@ private:
<< ", event: " << eventType);
}
+ bool IsQueryService() const {
+
+ switch(QueryType) {
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
+ return true;
+ default:
+ return false;
+ }
+
+ }
+
void InternalError(const NYql::TIssues& issues) {
LOG_E(issues.ToOneLineString());
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED,
@@ -433,6 +453,7 @@ private:
private:
TKqpPhyTxHolder::TConstPtr PhyTx;
+ const NKikimrKqp::EQueryType QueryType;
const TActorId Target;
const TString Database;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
@@ -448,12 +469,12 @@ private:
} // namespace
-IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target,
+IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
const TMaybe<TString>& requestType, const TString& database,
TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool temporary, TString sessionId,
TIntrusivePtr<TUserRequestContext> ctx)
{
- return new TKqpSchemeExecuter(phyTx, target, requestType, database, userToken, temporary, sessionId, std::move(ctx));
+ return new TKqpSchemeExecuter(phyTx, queryType, target, requestType, database, userToken, temporary, sessionId, std::move(ctx));
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index 42f43b0240..73b2cc06fd 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -196,7 +196,7 @@ public:
std::shared_ptr<NGRpcService::IRequestCtxMtSafe> rpcCtx) = 0;
};
-TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const TString& database,
+TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database,
std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem,
ui32 nodeId, TKqpRequestCounters::TPtr counters);
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index f9d0c0e262..78f3a10dc0 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -431,9 +431,10 @@ class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped<TKqpSchemeExec
public:
using TResult = IKqpGateway::TGenericResult;
- TKqpSchemeExecuterRequestHandler(TKqpPhyTxHolder::TConstPtr phyTx, const TMaybe<TString>& requestType, const TString& database,
+ TKqpSchemeExecuterRequestHandler(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TMaybe<TString>& requestType, const TString& database,
TIntrusiveConstPtr<NACLib::TUserToken> userToken, TPromise<TResult> promise)
: PhyTx(std::move(phyTx))
+ , QueryType(queryType)
, Database(database)
, UserToken(std::move(userToken))
, Promise(promise)
@@ -442,7 +443,7 @@ public:
void Bootstrap() {
auto ctx = MakeIntrusive<TUserRequestContext>();
- IActor* actor = CreateKqpSchemeExecuter(PhyTx, SelfId(), RequestType, Database, UserToken, false /* temporary */, TString() /* sessionId */, ctx);
+ IActor* actor = CreateKqpSchemeExecuter(PhyTx, QueryType, SelfId(), RequestType, Database, UserToken, false /* temporary */, TString() /* sessionId */, ctx);
Register(actor);
Become(&TThis::WaitState);
}
@@ -471,6 +472,7 @@ public:
private:
TKqpPhyTxHolder::TConstPtr PhyTx;
+ const NKikimrKqp::EQueryType QueryType;
const TString Database;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TPromise<TResult> Promise;
@@ -594,9 +596,10 @@ private:
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
public:
- TKikimrIcGateway(const TString& cluster, const TString& database, std::shared_ptr<IKqpTableMetadataLoader>&& metadataLoader,
+ TKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database, std::shared_ptr<IKqpTableMetadataLoader>&& metadataLoader,
TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters)
: Cluster(cluster)
+ , QueryType(queryType)
, Database(database)
, ActorSystem(actorSystem)
, NodeId(nodeId)
@@ -2085,7 +2088,7 @@ private:
TFuture<TGenericResult> SendSchemeExecuterRequest(const TString&, const TMaybe<TString>& requestType, const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder>& phyTx) override {
auto promise = NewPromise<TGenericResult>();
- IActor* requestHandler = new TKqpSchemeExecuterRequestHandler(phyTx, requestType, Database, UserToken, promise);
+ IActor* requestHandler = new TKqpSchemeExecuterRequestHandler(phyTx, QueryType, requestType, Database, UserToken, promise);
RegisterActor(requestHandler);
return promise.GetFuture();
}
@@ -2237,6 +2240,7 @@ private:
private:
TString Cluster;
+ const NKikimrKqp::EQueryType QueryType;
TString Database;
TActorSystem* ActorSystem;
ui32 NodeId;
@@ -2248,11 +2252,11 @@ private:
} // namespace
-TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const TString& database,
+TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database,
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader>&& metadataLoader, TActorSystem* actorSystem,
ui32 nodeId, TKqpRequestCounters::TPtr counters)
{
- return MakeIntrusive<TKikimrIcGateway>(cluster, database, std::move(metadataLoader), actorSystem, nodeId,
+ return MakeIntrusive<TKikimrIcGateway>(cluster, queryType, database, std::move(metadataLoader), actorSystem, nodeId,
counters);
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
index 87b60e866d..f88601de17 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
@@ -74,7 +74,7 @@ TIntrusivePtr<IKqpGateway> GetIcGateway(Tests::TServer& server) {
counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr), false);
- return CreateKikimrIcGateway(TestCluster, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
+ return CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
server.GetRuntime()->GetNodeId(0), counters);
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index c87164a2c6..94620b6427 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1045,11 +1045,13 @@ public:
}
void SendToSchemeExecuter(const TKqpPhyTxHolder::TConstPtr& tx) {
- auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>();
- const TString requestType = QueryState ? QueryState->GetRequestType() : TString();
+ YQL_ENSURE(QueryState);
+
+ auto userToken = QueryState->UserToken;
+ const TString requestType = QueryState->GetRequestType();
bool temporary = GetTemporaryTableInfo(tx).has_value();
- auto executerActor = CreateKqpSchemeExecuter(tx, SelfId(), requestType, Settings.Database, userToken,
+ auto executerActor = CreateKqpSchemeExecuter(tx, QueryState->GetType(), SelfId(), requestType, Settings.Database, userToken,
temporary, *TempTablesState.SessionId, QueryState->UserRequestContext);
ExecuterId = RegisterWithSameMailbox(executerActor);
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index e1c274e5b0..4388a60913 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -131,20 +131,9 @@ public:
RequestCounters->TxProxyMon = MakeIntrusive<NTxProxy::TTxProxyMon>(AppData()->Counters);
}
- void Bootstrap(const TActorContext& ctx) {
+ void Bootstrap(const TActorContext&) {
LOG_D("Worker bootstrapped");
Counters->ReportWorkerCreated(Settings.DbCounters);
-
- std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(
- TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
- Gateway = CreateKikimrIcGateway(Settings.Cluster, Settings.Database, std::move(loader),
- ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters);
-
- Config->FeatureFlags = AppData(ctx)->FeatureFlags;
-
- KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver,
- FederatedQuerySetup, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);
-
Become(&TKqpWorkerActor::ReadyState);
}
@@ -189,6 +178,16 @@ public:
QueryState->Sender = ev->Sender;
QueryState->RequestEv.reset(ev->Release().Release());
+ std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(
+ TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
+ Gateway = CreateKikimrIcGateway(Settings.Cluster, QueryState->RequestEv->GetType(), Settings.Database, std::move(loader),
+ ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters);
+
+ Config->FeatureFlags = AppData(ctx)->FeatureFlags;
+
+ KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver,
+ FederatedQuerySetup, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);
+
auto& queryRequest = QueryState->RequestEv;
QueryState->ProxyRequestId = proxyRequestId;
QueryState->KeepSession = Settings.LongSession || queryRequest->GetKeepSession();
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 64ff8b71f4..df7549c58a 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -35,7 +35,7 @@ TIntrusivePtr<NKqp::IKqpGateway> GetIcGateway(Tests::TServer& server) {
counters->Counters = new TKqpCounters(server.GetRuntime()->GetAppData(0).Counters);
counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(),TIntrusivePtr<NYql::TKikimrConfiguration>(nullptr),false);
- return NKqp::CreateKikimrIcGateway(TestCluster, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
+ return NKqp::CreateKikimrIcGateway(TestCluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
server.GetRuntime()->GetNodeId(0), counters);
}