diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-17 15:56:39 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-17 15:56:39 +0300 |
commit | 645cbbd7ba1cc9b36688acfb6a70afdf591da69e (patch) | |
tree | d8efc5e50158777207092dbd15d1a387bc9f772a | |
parent | 86f741a6d80237415417540b101ad33e8736154b (diff) | |
download | ydb-645cbbd7ba1cc9b36688acfb6a70afdf591da69e.tar.gz |
Use ApplyTableOperations for request checking KIKIMR-14466
ref:a7c6bd91bf3dd34547e5abf46ff9599b741b40fb
-rw-r--r-- | ydb/core/kqp/kqp_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_response.cpp | 84 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 39 |
3 files changed, 66 insertions, 58 deletions
diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h index 7d6f172899b..4081965fdaf 100644 --- a/ydb/core/kqp/kqp_impl.h +++ b/ydb/core/kqp/kqp_impl.h @@ -86,6 +86,7 @@ TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const T std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, const TActorId& MkqlCompileService); +TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const NYql::TIssue& issue); Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::NCommon::TOperationResult& queryResult); void AddQueryIssues(NKikimrKqp::TQueryResponse& response, const NYql::TIssues& issues); bool HasSchemeOrFatalIssues(const NYql::TIssues& issues); diff --git a/ydb/core/kqp/kqp_response.cpp b/ydb/core/kqp/kqp_response.cpp index d3859cd39c7..3ca33f640e0 100644 --- a/ydb/core/kqp/kqp_response.cpp +++ b/ydb/core/kqp/kqp_response.cpp @@ -9,6 +9,48 @@ using namespace NYql; namespace { +void CollectYdbStatuses(const TIssue& issue, TSet<Ydb::StatusIds::StatusCode>& statuses) { + if (issue.GetSeverity() == TSeverityIds::S_WARNING) { + return; + } + + if (auto status = GetYdbStatus(issue)) { + statuses.insert(*status); + return; + } + + const auto& subIssues = issue.GetSubIssues(); + if (subIssues.empty()) { + statuses.insert(Ydb::StatusIds::GENERIC_ERROR); + } + + for (auto& subIssue : subIssues) { + CollectYdbStatuses(*subIssue, statuses); + } +} + +bool HasSchemeOrFatalIssues(const TIssue& issue) { + if (issue.GetSeverity() == TSeverityIds::S_FATAL) { + return true; + } + + switch (issue.GetCode()) { + case TIssuesIds::KIKIMR_SCHEME_MISMATCH: + case TIssuesIds::KIKIMR_SCHEME_ERROR: + return true; + } + + for (auto& subIssue : issue.GetSubIssues()) { + if (HasSchemeOrFatalIssues(*subIssue)) { + return true; + } + } + + return false; +} + +} // namespace + TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) { if (issue.GetSeverity() == TSeverityIds::S_FATAL) { return Ydb::StatusIds::INTERNAL_ERROR; @@ -72,48 +114,6 @@ TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) { return TMaybe<Ydb::StatusIds::StatusCode>(); } -void CollectYdbStatuses(const TIssue& issue, TSet<Ydb::StatusIds::StatusCode>& statuses) { - if (issue.GetSeverity() == TSeverityIds::S_WARNING) { - return; - } - - if (auto status = GetYdbStatus(issue)) { - statuses.insert(*status); - return; - } - - const auto& subIssues = issue.GetSubIssues(); - if (subIssues.empty()) { - statuses.insert(Ydb::StatusIds::GENERIC_ERROR); - } - - for (auto& subIssue : subIssues) { - CollectYdbStatuses(*subIssue, statuses); - } -} - -bool HasSchemeOrFatalIssues(const TIssue& issue) { - if (issue.GetSeverity() == TSeverityIds::S_FATAL) { - return true; - } - - switch (issue.GetCode()) { - case TIssuesIds::KIKIMR_SCHEME_MISMATCH: - case TIssuesIds::KIKIMR_SCHEME_ERROR: - return true; - } - - for (auto& subIssue : issue.GetSubIssues()) { - if (HasSchemeOrFatalIssues(*subIssue)) { - return true; - } - } - - return false; -} - -} // namespace - Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::NCommon::TOperationResult& queryResult) { if (queryResult.Success()) { return Ydb::StatusIds::SUCCESS; diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 8da1054b857..96353152f9c 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -492,6 +492,18 @@ public: CreateNewTx(); } + static std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) { + TVector<NKqpProto::TKqpTableOp> operations(query.GetTableOps().begin(), query.GetTableOps().end()); + TVector<NKqpProto::TKqpTableInfo> tableInfos(query.GetTableInfos().begin(), query.GetTableInfos().end()); + + auto isolationLevel = *txCtx->EffectiveIsolationLevel; + bool strictDml = true; + + TExprContext ctx; + bool success = txCtx->ApplyTableOperations(operations, tableInfos, isolationLevel, strictDml, EKikimrQueryType::Dml, ctx); + return {success, ctx.IssueManager.GetIssues()}; + } + void PrepareQueryContext() { YQL_ENSURE(QueryState); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); @@ -524,8 +536,17 @@ public: QueryState->QueryCtx->TimeProvider = TAppData::TimeProvider; QueryState->QueryCtx->RandomProvider = TAppData::RandomProvider; - //const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); - //ApplyTableOperations(QueryState->QueryCtx.Get(), phyQuery); + const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); + auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery); + if (!success) { + YQL_ENSURE(!issues.Empty()); + google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message; + for (const auto& i : issues) { + IssueToMessage(i, message.Add()); + } + ReplyProcessError(requestInfo, *GetYdbStatus(issues.back()), "", &message); + return; + } auto action = queryRequest.GetAction(); auto queryType = queryRequest.GetType(); @@ -885,20 +906,6 @@ public: resStats->MutableCompilation()->Swap(&QueryState->CompileStats); } - /* - void ApplyTableOperations(TKikimrQueryContext* txCtx, const NKqpProto::TKqpPhyQuery& query) { - TVector<NKqpProto::TKqpTableOp> operations(query.GetTableOps().begin(), query.GetTableOps().end()); - TVector<NKqpProto::TKqpTableInfo> tableInfos(query.GetTableInfos().begin(), query.GetTableInfos().end()); - - auto isolationLevel = *txCtx->EffectiveIsolationLevel; - bool strictDml = true; - - TExprContext ctx; - txCtx->ApplyTableOperations(operations, tableInfos, isolationLevel, strictDml, - EKikimrQueryType::Dml, ctx); - } - */ - void FillTxInfo(NKikimrKqp::TQueryResponse* response) { Y_VERIFY(QueryState); if (QueryState->TxId) { |