diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-31 17:48:20 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-31 17:48:20 +0300 |
commit | ebd4694e4ea72110b1b632fc348da17827dbca9f (patch) | |
tree | a57caa5beadbbe50bd00267aac7155fec91056f4 | |
parent | 4b7a88e6b075886a1c2d3d27b8150db73b5726b3 (diff) | |
download | ydb-ebd4694e4ea72110b1b632fc348da17827dbca9f.tar.gz |
Refactor ApplyTableOperations
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 15 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.h | 55 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 14 |
4 files changed, 44 insertions, 51 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 30cf95a1ff..473d3fb74b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1517,8 +1517,7 @@ private: return PerformExecution(execQuery, ctx, cluster, settings.Mode, runFunc, finalizeFunc); } - bool ApplyTableOperations(const TString& cluster, const TVector<NKqpProto::TKqpTableOp>& tableOps, - NKikimrKqp::EIsolationLevel isolationLevel, TExprContext& ctx) + std::pair<bool, TIssues> ApplyTableOperations(const TString& cluster, const TVector<NKqpProto::TKqpTableOp>& tableOps) { bool enableImmediateEffects = SessionCtx->Config().FeatureFlags.GetEnableKqpImmediateEffects(); auto queryType = SessionCtx->Query().Type; @@ -1534,12 +1533,10 @@ private: if (!SessionCtx->HasTx()) { TKikimrTransactionContextBase emptyCtx; - return emptyCtx.ApplyTableOperations(tableOps, tableInfo, isolationLevel, enableImmediateEffects, - queryType, ctx); + return emptyCtx.ApplyTableOperations(tableOps, tableInfo, enableImmediateEffects, queryType); } - return SessionCtx->Tx().ApplyTableOperations(tableOps, tableInfo, isolationLevel, - enableImmediateEffects, queryType, ctx); + return SessionCtx->Tx().ApplyTableOperations(tableOps, tableInfo, enableImmediateEffects, queryType); } bool ApplyDdlOperation(const TString& cluster, TPositionHandle pos, const TString& table, @@ -1555,7 +1552,11 @@ private: protoOp.SetTable(table); protoOp.SetOperation((ui32)op); - return ApplyTableOperations(cluster, {protoOp}, NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE, ctx); + auto [success, issues] = ApplyTableOperations(cluster, {protoOp}); + for (auto& i : issues) { + ctx.AddError(std::move(i)); + } + return success; } private: diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index 763da848c7..4f576d1951 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -555,15 +555,18 @@ bool ValidateTableHasIndex(TKikimrTableMetadataPtr metadata, TExprContext& ctx, } bool AddDmlIssue(const TIssue& issue, TExprContext& ctx) { + auto newIssue = AddDmlIssue(issue); + ctx.AddError(newIssue); + return false; +} + +TIssue AddDmlIssue(const TIssue& issue) { TIssue newIssue; newIssue.SetCode(issue.GetCode(), ESeverity::TSeverityIds_ESeverityId_S_ERROR); newIssue.SetMessage("Detected violation of logical DML constraints. YDB transactions don't see their own" " changes, make sure you perform all table reads before any modifications."); - newIssue.AddSubIssue(new TIssue(issue)); - - ctx.AddError(newIssue); - return false; + return newIssue; } TKiDataQueryBlockSettings TKiDataQueryBlockSettings::Parse(const NNodes::TKiDataQueryBlock& node) { diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 3024b13c81..0c9d4663ef 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -217,6 +217,7 @@ const TYdbOperations& KikimrDataOps(); const TYdbOperations& KikimrModifyOps(); const TYdbOperations& KikimrReadOps(); +TIssue AddDmlIssue(const TIssue& issue); bool AddDmlIssue(const TIssue& issue, TExprContext& ctx); class TKikimrTransactionContextBase : public TThrRefBase { @@ -260,20 +261,16 @@ public: } template<class IterableKqpTableOps, class IterableKqpTableInfos> - bool ApplyTableOperations(const IterableKqpTableOps& operations, - const IterableKqpTableInfos& tableInfos, NKikimrKqp::EIsolationLevel isolationLevel, - bool enableImmediateEffects, EKikimrQueryType queryType, TExprContext& ctx) + std::pair<bool, TIssues> ApplyTableOperations(const IterableKqpTableOps& operations, + const IterableKqpTableInfos& tableInfos, bool enableImmediateEffects, EKikimrQueryType queryType) { + TIssues issues; if (IsClosed()) { TString message = TStringBuilder() << "Cannot perform operations on closed transaction."; - ctx.AddError(YqlIssue({}, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; + issues.AddIssue(YqlIssue({}, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return {false, issues}; } - isolationLevel = EffectiveIsolationLevel - ? *EffectiveIsolationLevel - : isolationLevel; - bool hasScheme = false; bool hasData = false; for (auto& pair : TableOperations) { @@ -299,50 +296,50 @@ public: if (!info) { TString message = TStringBuilder() << "Unable to find table info for table '" << table << "'"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message)); - return false; + issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message)); + return {false, issues}; } if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in data query"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; + issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return {false, issues}; } if (IsIn({EKikimrQueryType::Query, EKikimrQueryType::Script}, queryType) && (newOp & KikimrSchemeOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in query"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; + issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return {false, issues}; } if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in scheme query"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; + issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return {false, issues}; } if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in scan query"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; + issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return {false, issues}; } if (hasData && (newOp & KikimrSchemeOps()) || hasScheme && (newOp & KikimrDataOps())) { - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX)); - return false; + issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX)); + return {false, issues}; } if (Readonly && (newOp & KikimrModifyOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in read only transaction"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; + issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return {false, issues}; } auto& currentOps = TableOperations[table]; @@ -351,22 +348,22 @@ public: if (KikimrReadOps() & newOp) { TString message = TStringBuilder() << "Data modifications previously made to table '" << table << "' in current transaction won't be seen by operation: '" << newOp << "'"; - if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message), ctx)) { - return false; - } + auto newIssue = AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message)); + issues.AddIssue(newIssue); + return {false, issues}; } if (info->GetHasIndexTables()) { TString message = TStringBuilder() << "Multiple modification of table with secondary indexes is not supported yet"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; + issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return {false, issues}; } } currentOps |= newOp; } - return true; + return {true, issues}; } virtual ~TKikimrTransactionContextBase() = default; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index e0d916beaf..3d82733b32 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -569,16 +569,6 @@ public: Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize()); } - std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) { - auto isolationLevel = *txCtx->EffectiveIsolationLevel; - bool enableImmediateEffects = Config->FeatureFlags.GetEnableKqpImmediateEffects(); - - TExprContext ctx; - bool success = txCtx->ApplyTableOperations(query.GetTableOps(), query.GetTableInfos(), isolationLevel, - enableImmediateEffects, EKikimrQueryType::Dml, ctx); - return {success, ctx.IssueManager.GetIssues()}; - } - bool PrepareQueryTransaction() { if (QueryState->HasTxControl()) { const auto& txControl = QueryState->GetTxControl(); @@ -623,7 +613,9 @@ public: } const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); - auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery); + bool enableImmediateEffects = Config->FeatureFlags.GetEnableKqpImmediateEffects(); + auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(), + enableImmediateEffects, EKikimrQueryType::Dml); if (!success) { YQL_ENSURE(!issues.Empty()); ReplyQueryError(GetYdbStatus(issues), "", MessageFromIssues(issues)); |