aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-10-26 23:12:43 +0300
committergvit <gvit@ydb.tech>2023-10-26 23:32:48 +0300
commit3be69bf3deb6f06121d3f4316f88c2a9532675d6 (patch)
tree90d239a32fcad45962465d7467e7b6f70a145430
parent9fc3c43b3476e61fe0df7d4c9906e44768bc7fa8 (diff)
downloadydb-3be69bf3deb6f06121d3f4316f88c2a9532675d6.tar.gz
refactor alter table: use scheme executer
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h7
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp37
-rw-r--r--ydb/core/kqp/gateway/actors/scheme.h21
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp74
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp83
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h9
-rw-r--r--ydb/core/kqp/query_data/kqp_prepared_query.cpp8
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h4
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp5
-rw-r--r--ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp4
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp24
-rw-r--r--ydb/services/ydb/ydb_table_ut.cpp8
13 files changed, 223 insertions, 64 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index c8bd2dfc9a..2a4fcd4fff 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -94,9 +94,10 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
-IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database,
- TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc,
- bool temporary, TString SessionId);
+IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target,
+ const TMaybe<TString>& requestType, const TString& database,
+ TIntrusiveConstPtr<NACLib::TUserToken> userToken,
+ bool temporary, TString SessionId, TIntrusivePtr<TUserRequestContext> ctx);
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(
IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index ae38be0628..5271170a0a 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -35,6 +35,7 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPt
void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch&& rows) {
YQL_ENSURE(idx < TxResults.size());
+ YQL_ENSURE(AllocState);
ResultRowsCount += rows.RowCount();
ResultRowsBytes += rows.Size();
auto guard = AllocState->TypeEnv.BindAllocator();
@@ -48,7 +49,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch
}
TEvKqpExecuter::TEvTxResponse::~TEvTxResponse() {
- if (!TxResults.empty()) {
+ if (!TxResults.empty() && Y_LIKELY(AllocState)) {
with_lock(AllocState->Alloc) {
TxResults.crop(0);
}
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
index b40abaf3a5..60757028b8 100644
--- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
@@ -1,4 +1,5 @@
#include "kqp_executer.h"
+#include "kqp_executer_impl.h"
#include <ydb/core/kqp/gateway/actors/scheme.h>
#include <ydb/core/kqp/gateway/local_rpc/helper.h>
@@ -8,15 +9,10 @@
namespace NKikimr::NKqp {
-#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << ". " << stream)
-#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << ". " << stream)
-#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << ". " << stream)
-
using namespace NThreading;
namespace {
-
static bool CheckAlterAccess(const NACLib::TUserToken& userToken, const NSchemeCache::TSchemeCacheNavigate* navigate) {
bool isDatabase = true; // first entry is always database
@@ -53,20 +49,22 @@ public:
return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR;
}
- TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database,
- TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc,
- bool temporary, TString sessionId)
+ TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TMaybe<TString>& requestType,
+ const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken,
+ bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx)
: PhyTx(phyTx)
, Target(target)
, Database(database)
, UserToken(userToken)
, Temporary(temporary)
, SessionId(sessionId)
+ , RequestContext(std::move(ctx))
+ , RequestType(requestType)
{
YQL_ENSURE(PhyTx);
YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME);
- ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(txAlloc);
+ ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(nullptr);
}
void StartBuildOperation() {
@@ -85,6 +83,10 @@ public:
ev->Record.SetUserToken(UserToken->GetSerializedToken());
}
+ if (RequestType) {
+ ev->Record.SetRequestType(*RequestType);
+ }
+
const auto& schemeOp = PhyTx->GetSchemeOperation();
switch (schemeOp.GetOperationCase()) {
case NKqpProto::TKqpSchemeOperation::kCreateTable: {
@@ -123,7 +125,7 @@ public:
case NKqpProto::TKqpSchemeOperation::kAlterTable: {
auto modifyScheme = schemeOp.GetAlterTable();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
- return;
+ break;
}
case NKqpProto::TKqpSchemeOperation::kBuildOperation: {
@@ -269,6 +271,10 @@ public:
IActor::PassAway();
}
+ const TIntrusivePtr<TUserRequestContext>& GetUserRequestContext() const {
+ return RequestContext;
+ }
+
void Handle(NSchemeShard::TEvIndexBuilder::TEvCreateResponse::TPtr& ev) {
const auto& response = ev->Get()->Record;
const auto status = response.GetStatus();
@@ -376,6 +382,7 @@ private:
void UnexpectedEvent(const TString& state, ui32 eventType) {
LOG_C("TKqpSchemeExecuter, unexpected event: " << eventType
<< ", at state:" << state << ", selfID: " << SelfId());
+
InternalError(TStringBuilder() << "Unexpected event at TKqpSchemeExecuter, state: " << state
<< ", event: " << eventType);
}
@@ -419,14 +426,18 @@ private:
ui64 TxId = 0;
TActorId SchemePipeActorId_;
ui64 SchemeShardTabletId = 0;
+ TIntrusivePtr<TUserRequestContext> RequestContext;
+ const TMaybe<TString> RequestType;
};
} // namespace
-IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database,
- TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc, bool temporary, TString sessionId)
+IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target,
+ const TMaybe<TString>& requestType, const TString& database,
+ TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool temporary, TString sessionId,
+ TIntrusivePtr<TUserRequestContext> ctx)
{
- return new TKqpSchemeExecuter(phyTx, target, database, userToken, txAlloc, temporary, sessionId);
+ return new TKqpSchemeExecuter(phyTx, target, requestType, database, userToken, temporary, sessionId, std::move(ctx));
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/gateway/actors/scheme.h b/ydb/core/kqp/gateway/actors/scheme.h
index 1ea2d8ccdd..d4473472d9 100644
--- a/ydb/core/kqp/gateway/actors/scheme.h
+++ b/ydb/core/kqp/gateway/actors/scheme.h
@@ -135,6 +135,27 @@ public:
return;
}
+ case NKikimrScheme::EStatus::StatusSchemeError: {
+ Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_SCHEME_ERROR,
+ response.GetSchemeShardReason(), {}));
+ this->Die(ctx);
+ return;
+ }
+
+ case NKikimrScheme::EStatus::StatusPreconditionFailed: {
+ Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
+ response.GetSchemeShardReason(), {}));
+ this->Die(ctx);
+ return;
+ }
+
+ case NKikimrScheme::EStatus::StatusInvalidParameter: {
+ Promise.SetValue(NYql::NCommon::ResultFromIssues<TResult>(NYql::TIssuesIds::KIKIMR_BAD_REQUEST,
+ response.GetSchemeShardReason(), {}));
+ this->Die(ctx);
+ return;
+ }
+
default:
break;
}
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 53e173e775..31e7e7c5fd 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -467,6 +467,55 @@ private:
TVector<NYql::NDqProto::TDqExecutionStats> Executions;
};
+class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped<TKqpSchemeExecuterRequestHandler> {
+public:
+ using TResult = IKqpGateway::TGenericResult;
+
+ TKqpSchemeExecuterRequestHandler(TKqpPhyTxHolder::TConstPtr phyTx, const TMaybe<TString>& requestType, const TString& database,
+ TIntrusiveConstPtr<NACLib::TUserToken> userToken, TPromise<TResult> promise)
+ : PhyTx(std::move(phyTx))
+ , Database(database)
+ , UserToken(std::move(userToken))
+ , Promise(promise)
+ , RequestType(requestType)
+ {}
+
+ void Bootstrap() {
+ auto ctx = MakeIntrusive<TUserRequestContext>();
+ IActor* actor = CreateKqpSchemeExecuter(PhyTx, SelfId(), RequestType, Database, UserToken, false /* temporary */, TString() /* sessionId */, ctx);
+ Register(actor);
+ Become(&TThis::WaitState);
+ }
+
+ STATEFN(WaitState) {
+ switch(ev->GetTypeRewrite()) {
+ hFunc(TEvKqpExecuter::TEvTxResponse, Handle);
+ }
+ }
+
+ void Handle(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
+ auto* response = ev->Get()->Record.MutableResponse();
+
+ TResult result;
+ if (response->GetStatus() == Ydb::StatusIds::SUCCESS) {
+ result.SetSuccess();
+ } else {
+ for (auto& issue : response->GetIssues()) {
+ result.AddIssue(NYql::IssueFromMessage(issue));
+ }
+ }
+
+ Promise.SetValue(result);
+ this->PassAway();
+ }
+
+private:
+ TKqpPhyTxHolder::TConstPtr PhyTx;
+ const TString Database;
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+ TPromise<TResult> Promise;
+ const TMaybe<TString> RequestType;
+};
class TKqpExecLiteralRequestHandler: public TActorBootstrapped<TKqpExecLiteralRequestHandler> {
public:
@@ -798,23 +847,11 @@ public:
}
}
- TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
- const TMaybe<TString>& requestType, ui64 flags) override
+ TFuture<TGenericResult> AlterTable(const TString&, Ydb::Table::AlterTableRequest&&, const TMaybe<TString>&, ui64) override
{
try {
- YQL_ENSURE(!flags); //Supported only for prepared mode
- if (!CheckCluster(cluster)) {
- return InvalidCluster<TGenericResult>(cluster);
- }
-
- // FIXME: should be defined in grpc_services/rpc_calls.h, but cause cyclic dependency
- using namespace NGRpcService;
- using TEvAlterTableRequest = TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest,
- Ydb::Table::AlterTableResponse>;
-
- return SendLocalRpcRequestNoResult<TEvAlterTableRequest>(std::move(req), Database, GetTokenCompat(), requestType);
- }
- catch (yexception& e) {
+ YQL_ENSURE(false, "gateway doesn't implement alter");
+ } catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
}
@@ -2075,6 +2112,13 @@ private:
return promise.GetFuture();
}
+ TFuture<TGenericResult> SendSchemeExecuterRequest(const TString&, const TMaybe<TString>& requestType, const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder>& phyTx) override {
+ auto promise = NewPromise<TGenericResult>();
+ IActor* requestHandler = new TKqpSchemeExecuterRequestHandler(phyTx, requestType, Database, UserToken, promise);
+ RegisterActor(requestHandler);
+ return promise.GetFuture();
+ }
+
template<typename TRpc>
TFuture<TGenericResult> SendLocalRpcRequestNoResult(typename TRpc::TRequest&& proto, const TString& databse, const TString& token, const TMaybe<TString>& requestType = {}) {
return NRpcService::DoLocalRpc<TRpc>(std::move(proto), databse, token, requestType, ActorSystem).Apply([](NThreading::TFuture<typename TRpc::TResponse> future) {
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 568715885a..ca56c8aac0 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -544,18 +544,10 @@ public:
return tablePromise.GetFuture();
}
- TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
- const TMaybe<TString>& requestType, ui64 flags) override
+ TFuture<TGenericResult> PrepareAlterTable(const TString&, Ydb::Table::AlterTableRequest&& req,
+ const TMaybe<TString>&, ui64 flags)
{
- CHECK_PREPARED_DDL(AlterTable);
-
- if (!IsPrepare()) {
- return Gateway->AlterTable(cluster, std::move(req), requestType, flags);
- }
- auto &phyQuery =
- *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
- auto &phyTx = *phyQuery.AddTransactions();
- phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+ YQL_ENSURE(SessionCtx->Query().PreparingQuery);
auto promise = NewPromise<TGenericResult>();
const auto ops = GetAlterOperationKinds(&req);
if (ops.size() != 1) {
@@ -571,6 +563,10 @@ public:
const auto opType = *ops.begin();
auto tablePromise = NewPromise<TGenericResult>();
if (opType == EAlterOperationKind::AddIndex) {
+ auto &phyQuery =
+ *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+ auto &phyTx = *phyQuery.AddTransactions();
+ phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
auto buildOp = phyTx.MutableSchemeOperation()->MutableBuildOperation();
Ydb::StatusIds::StatusCode code;
TString error;
@@ -602,6 +598,8 @@ public:
auto &phyQuery =
*sessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto &phyTx = *phyQuery.AddTransactions();
+ phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+
auto alter = phyTx.MutableSchemeOperation()->MutableAlterTable();
const TPathId invalidPathId;
Ydb::StatusIds::StatusCode code;
@@ -621,6 +619,69 @@ public:
return tablePromise.GetFuture();
}
+ TFuture<TGenericResult> SendSchemeExecuterRequest(const TString& cluster, const TMaybe<TString>& requestType,
+ const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder> &phyTx) override
+ {
+ return Gateway->SendSchemeExecuterRequest(cluster, requestType, phyTx);
+ }
+
+ TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
+ const TMaybe<TString>& requestType, ui64 flags) override
+ {
+ CHECK_PREPARED_DDL(AlterTable);
+
+ auto tablePromise = NewPromise<TGenericResult>();
+
+ if (!IsPrepare()) {
+ SessionCtx->Query().PrepareOnly = false;
+ if (SessionCtx->Query().PreparingQuery) {
+ auto code = Ydb::StatusIds::BAD_REQUEST;
+ auto error = TStringBuilder() << "multiple transactions are not supported for alter table operation.";
+ IKqpGateway::TGenericResult errResult;
+ errResult.AddIssue(NYql::TIssue(error));
+ errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
+ tablePromise.SetValue(errResult);
+ return tablePromise.GetFuture();
+ }
+
+ SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
+ }
+
+ auto prepareFuture = PrepareAlterTable(cluster, std::move(req), requestType, flags);
+ if (IsPrepare())
+ return prepareFuture;
+
+ auto sessionCtx = SessionCtx;
+ auto gateway = Gateway;
+ prepareFuture.Subscribe([cluster, requestType, tablePromise, sessionCtx, gateway](const TFuture<IKqpGateway::TGenericResult> &future) mutable {
+ auto result = future.GetValue();
+ TPreparedQueryHolder::TConstPtr preparedQuery = std::make_shared<TPreparedQueryHolder>(sessionCtx->Query().PreparingQuery.release(), nullptr);
+ if (result.Success()) {
+ auto executeFuture = gateway->SendSchemeExecuterRequest(cluster, requestType, preparedQuery->GetPhyTx(0));
+ executeFuture.Subscribe([tablePromise](const TFuture<IKqpGateway::TGenericResult> &future) mutable {
+ auto fresult = future.GetValue();
+ if (fresult.Success()) {
+ TGenericResult result;
+ result.SetSuccess();
+ tablePromise.SetValue(result);
+ } else {
+ tablePromise.SetValue(
+ ResultFromIssues<TGenericResult>(fresult.Status(), fresult.Issues())
+ );
+ }
+ });
+ return;
+ } else {
+ tablePromise.SetValue(ResultFromIssues<TGenericResult>(
+ result.Status(), result.Issues()));
+
+ return;
+ }
+ });
+
+ return tablePromise.GetFuture();
+ }
+
TFuture<TGenericResult> RenameTable(const TString& src, const TString& dst, const TString& cluster) override {
FORWARD_ENSURE_NO_PREPARE(RenameTable, src, dst, cluster);
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index e6707c99f4..5a5d9a46b8 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -16,6 +16,7 @@
#include <ydb/services/metadata/manager/abstract.h>
#include <ydb/core/kqp/query_data/kqp_query_data.h>
+#include <ydb/core/kqp/query_data/kqp_prepared_query.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/scheme/scheme_types_proto.h>
@@ -28,6 +29,10 @@ namespace NKikimr {
namespace NMiniKQL {
class IFunctionRegistry;
}
+
+ namespace NKqp {
+ class TKqpPhyTxHolder;
+ }
}
namespace NYql {
@@ -767,6 +772,10 @@ public:
virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0;
+ virtual NThreading::TFuture<TGenericResult> SendSchemeExecuterRequest(const TString& cluster,
+ const TMaybe<TString>& requestType,
+ const std::shared_ptr<const NKikimr::NKqp::TKqpPhyTxHolder> &phyTx) = 0;
+
virtual NThreading::TFuture<TGenericResult> AlterTable(const TString& cluster, Ydb::Table::AlterTableRequest&& req,
const TMaybe<TString>& requestType, ui64 flags) = 0;
diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp
index 4a5a10acd6..0be3943a0e 100644
--- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp
+++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp
@@ -74,6 +74,7 @@ TKqpPhyTxHolder::TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPrepar
const auto& txResult = Proto->GetResults(i);
auto& result = TxResultsMeta[i];
+ YQL_ENSURE(Alloc);
result.MkqlItemType = ImportTypeFromProto(txResult.GetItemType(), Alloc->TypeEnv);
//Hack to prevent data race. Side effect of IsPresortSupported - fill cached value.
//So no more concurent write subsequently
@@ -107,9 +108,14 @@ const NKikimr::NKqp::TStagePredictor& TKqpPhyTxHolder::GetCalculationPredictor(c
TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry)
: Proto(proto)
- , Alloc(std::move(std::make_shared<TPreparedQueryAllocHolder>(functionRegistry)))
+ , Alloc(nullptr)
, TableConstInfoById(MakeIntrusive<TTableConstInfoMap>())
{
+
+ if (functionRegistry) {
+ Alloc = std::make_shared<TPreparedQueryAllocHolder>(functionRegistry);
+ }
+
THashSet<TString> tablesSet;
const auto& phyQuery = Proto->GetPhysicalQuery();
Transactions.reserve(phyQuery.TransactionsSize());
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 2b7b4f3d82..26e84891f4 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -139,6 +139,10 @@ public:
return RequestEv->GetSyntax();
}
+ const TString& GetRequestType() const {
+ return RequestEv->GetRequestType();
+ }
+
std::shared_ptr<std::map<TString, Ydb::Type>> GetQueryParameterTypes() const {
return QueryParameterTypes;
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 2f6dcf2f4a..c5d3197cb4 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1040,10 +1040,11 @@ public:
void SendToSchemeExecuter(const TKqpPhyTxHolder::TConstPtr& tx) {
auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>();
+ const TString requestType = QueryState ? QueryState->GetRequestType() : TString();
bool temporary = GetTemporaryTableInfo(tx).has_value();
- auto executerActor = CreateKqpSchemeExecuter(tx, SelfId(), Settings.Database, userToken,
- QueryState->TxCtx->TxAlloc, temporary, *TempTablesState.SessionId);
+ auto executerActor = CreateKqpSchemeExecuter(tx, SelfId(), requestType, Settings.Database, userToken,
+ temporary, *TempTablesState.SessionId, QueryState->UserRequestContext);
ExecuterId = RegisterWithSameMailbox(executerActor);
}
diff --git a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
index be18255b04..e58d78fc33 100644
--- a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
@@ -24,7 +24,7 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
)");
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
}
{
@@ -1031,7 +1031,7 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
)");
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
}
{
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 9cc82eeb8f..cc7dcfd781 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -1151,7 +1151,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
)";
const auto result = session.ExecuteSchemeQuery(query << ";").GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
}
{
@@ -1459,13 +1459,13 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
createTable(R"(Interval("-P1D") ON Ts)", EStatus::GENERIC_ERROR, "Interval value cannot be negative");
- createTable(R"(Interval("P1D") ON CreatedAt)", EStatus::GENERIC_ERROR, "Cannot enable TTL on unknown column");
+ createTable(R"(Interval("P1D") ON CreatedAt)", EStatus::SCHEME_ERROR, "Cannot enable TTL on unknown column");
- createTable(R"(Interval("P1D") ON StringValue)", EStatus::GENERIC_ERROR, "Unsupported column type");
+ createTable(R"(Interval("P1D") ON StringValue)", EStatus::SCHEME_ERROR, "Unsupported column type");
- createTable(R"(Interval("P1D") ON Uint32Value)", EStatus::GENERIC_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified");
- createTable(R"(Interval("P1D") ON Uint64Value)", EStatus::GENERIC_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified");
- createTable(R"(Interval("P1D") ON DyNumberValue)", EStatus::GENERIC_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified");
+ createTable(R"(Interval("P1D") ON Uint32Value)", EStatus::SCHEME_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified");
+ createTable(R"(Interval("P1D") ON Uint64Value)", EStatus::SCHEME_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified");
+ createTable(R"(Interval("P1D") ON DyNumberValue)", EStatus::SCHEME_ERROR, "'ValueSinceUnixEpochModeSettings' should be specified");
createTable(R"(Interval("P1D") ON Ts)");
{
@@ -2409,7 +2409,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
)";
auto session = db.CreateSession().GetValueSync().GetSession();
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
}
}
@@ -3568,7 +3568,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR,
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST,
result.GetIssues().ToString());
}
}
@@ -4217,7 +4217,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
STORE = COLUMN
);)";
result = session.ExecuteSchemeQuery(query3).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
}
Y_UNIT_TEST(CreateAlterDropColumnTableInStore) {
@@ -4358,7 +4358,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
}
Y_UNIT_TEST(CreateDropColumnTableNegative) {
@@ -4399,7 +4399,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
}
{ // disallow nullable key
@@ -4416,7 +4416,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
}
}
diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp
index 97d2b03f33..3659095f71 100644
--- a/ydb/services/ydb/ydb_table_ut.cpp
+++ b/ydb/services/ydb/ydb_table_ut.cpp
@@ -149,7 +149,7 @@ Y_UNIT_TEST_SUITE(YdbYqlClient) {
);
)___").ExtractValueSync();
UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
}
{
@@ -161,7 +161,7 @@ Y_UNIT_TEST_SUITE(YdbYqlClient) {
);
)___").ExtractValueSync();
UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
}
}
@@ -1147,11 +1147,11 @@ R"___(<main>: Error: Transaction not found: , code: 2015
PRIMARY KEY (Key)
);
)___").ExtractValueSync();
- UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
auto ref = R"___(<main>: Error: Execution, code: 1060
<main>:5:30: Error: Executing CREATE TABLE
- <main>: Error: Scheme operation failed, status: ExecError, reason: Column Key has wrong key type Double
+ <main>: Error: Column Key has wrong key type Double, code: 2003
)___";
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), ref);
}