aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-17 15:56:39 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-17 15:56:39 +0300
commit645cbbd7ba1cc9b36688acfb6a70afdf591da69e (patch)
treed8efc5e50158777207092dbd15d1a387bc9f772a
parent86f741a6d80237415417540b101ad33e8736154b (diff)
downloadydb-645cbbd7ba1cc9b36688acfb6a70afdf591da69e.tar.gz
Use ApplyTableOperations for request checking KIKIMR-14466
ref:a7c6bd91bf3dd34547e5abf46ff9599b741b40fb
-rw-r--r--ydb/core/kqp/kqp_impl.h1
-rw-r--r--ydb/core/kqp/kqp_response.cpp84
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp39
3 files changed, 66 insertions, 58 deletions
diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h
index 7d6f172899b..4081965fdaf 100644
--- a/ydb/core/kqp/kqp_impl.h
+++ b/ydb/core/kqp/kqp_impl.h
@@ -86,6 +86,7 @@ TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const T
std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters,
const TActorId& MkqlCompileService);
+TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const NYql::TIssue& issue);
Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::NCommon::TOperationResult& queryResult);
void AddQueryIssues(NKikimrKqp::TQueryResponse& response, const NYql::TIssues& issues);
bool HasSchemeOrFatalIssues(const NYql::TIssues& issues);
diff --git a/ydb/core/kqp/kqp_response.cpp b/ydb/core/kqp/kqp_response.cpp
index d3859cd39c7..3ca33f640e0 100644
--- a/ydb/core/kqp/kqp_response.cpp
+++ b/ydb/core/kqp/kqp_response.cpp
@@ -9,6 +9,48 @@ using namespace NYql;
namespace {
+void CollectYdbStatuses(const TIssue& issue, TSet<Ydb::StatusIds::StatusCode>& statuses) {
+ if (issue.GetSeverity() == TSeverityIds::S_WARNING) {
+ return;
+ }
+
+ if (auto status = GetYdbStatus(issue)) {
+ statuses.insert(*status);
+ return;
+ }
+
+ const auto& subIssues = issue.GetSubIssues();
+ if (subIssues.empty()) {
+ statuses.insert(Ydb::StatusIds::GENERIC_ERROR);
+ }
+
+ for (auto& subIssue : subIssues) {
+ CollectYdbStatuses(*subIssue, statuses);
+ }
+}
+
+bool HasSchemeOrFatalIssues(const TIssue& issue) {
+ if (issue.GetSeverity() == TSeverityIds::S_FATAL) {
+ return true;
+ }
+
+ switch (issue.GetCode()) {
+ case TIssuesIds::KIKIMR_SCHEME_MISMATCH:
+ case TIssuesIds::KIKIMR_SCHEME_ERROR:
+ return true;
+ }
+
+ for (auto& subIssue : issue.GetSubIssues()) {
+ if (HasSchemeOrFatalIssues(*subIssue)) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+} // namespace
+
TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) {
if (issue.GetSeverity() == TSeverityIds::S_FATAL) {
return Ydb::StatusIds::INTERNAL_ERROR;
@@ -72,48 +114,6 @@ TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) {
return TMaybe<Ydb::StatusIds::StatusCode>();
}
-void CollectYdbStatuses(const TIssue& issue, TSet<Ydb::StatusIds::StatusCode>& statuses) {
- if (issue.GetSeverity() == TSeverityIds::S_WARNING) {
- return;
- }
-
- if (auto status = GetYdbStatus(issue)) {
- statuses.insert(*status);
- return;
- }
-
- const auto& subIssues = issue.GetSubIssues();
- if (subIssues.empty()) {
- statuses.insert(Ydb::StatusIds::GENERIC_ERROR);
- }
-
- for (auto& subIssue : subIssues) {
- CollectYdbStatuses(*subIssue, statuses);
- }
-}
-
-bool HasSchemeOrFatalIssues(const TIssue& issue) {
- if (issue.GetSeverity() == TSeverityIds::S_FATAL) {
- return true;
- }
-
- switch (issue.GetCode()) {
- case TIssuesIds::KIKIMR_SCHEME_MISMATCH:
- case TIssuesIds::KIKIMR_SCHEME_ERROR:
- return true;
- }
-
- for (auto& subIssue : issue.GetSubIssues()) {
- if (HasSchemeOrFatalIssues(*subIssue)) {
- return true;
- }
- }
-
- return false;
-}
-
-} // namespace
-
Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::NCommon::TOperationResult& queryResult) {
if (queryResult.Success()) {
return Ydb::StatusIds::SUCCESS;
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 8da1054b857..96353152f9c 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -492,6 +492,18 @@ public:
CreateNewTx();
}
+ static std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) {
+ TVector<NKqpProto::TKqpTableOp> operations(query.GetTableOps().begin(), query.GetTableOps().end());
+ TVector<NKqpProto::TKqpTableInfo> tableInfos(query.GetTableInfos().begin(), query.GetTableInfos().end());
+
+ auto isolationLevel = *txCtx->EffectiveIsolationLevel;
+ bool strictDml = true;
+
+ TExprContext ctx;
+ bool success = txCtx->ApplyTableOperations(operations, tableInfos, isolationLevel, strictDml, EKikimrQueryType::Dml, ctx);
+ return {success, ctx.IssueManager.GetIssues()};
+ }
+
void PrepareQueryContext() {
YQL_ENSURE(QueryState);
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
@@ -524,8 +536,17 @@ public:
QueryState->QueryCtx->TimeProvider = TAppData::TimeProvider;
QueryState->QueryCtx->RandomProvider = TAppData::RandomProvider;
- //const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
- //ApplyTableOperations(QueryState->QueryCtx.Get(), phyQuery);
+ const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
+ auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery);
+ if (!success) {
+ YQL_ENSURE(!issues.Empty());
+ google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message;
+ for (const auto& i : issues) {
+ IssueToMessage(i, message.Add());
+ }
+ ReplyProcessError(requestInfo, *GetYdbStatus(issues.back()), "", &message);
+ return;
+ }
auto action = queryRequest.GetAction();
auto queryType = queryRequest.GetType();
@@ -885,20 +906,6 @@ public:
resStats->MutableCompilation()->Swap(&QueryState->CompileStats);
}
- /*
- void ApplyTableOperations(TKikimrQueryContext* txCtx, const NKqpProto::TKqpPhyQuery& query) {
- TVector<NKqpProto::TKqpTableOp> operations(query.GetTableOps().begin(), query.GetTableOps().end());
- TVector<NKqpProto::TKqpTableInfo> tableInfos(query.GetTableInfos().begin(), query.GetTableInfos().end());
-
- auto isolationLevel = *txCtx->EffectiveIsolationLevel;
- bool strictDml = true;
-
- TExprContext ctx;
- txCtx->ApplyTableOperations(operations, tableInfos, isolationLevel, strictDml,
- EKikimrQueryType::Dml, ctx);
- }
- */
-
void FillTxInfo(NKikimrKqp::TQueryResponse* response) {
Y_VERIFY(QueryState);
if (QueryState->TxId) {