aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2023-03-31 17:48:20 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2023-03-31 17:48:20 +0300
commitebd4694e4ea72110b1b632fc348da17827dbca9f (patch)
treea57caa5beadbbe50bd00267aac7155fec91056f4
parent4b7a88e6b075886a1c2d3d27b8150db73b5726b3 (diff)
downloadydb-ebd4694e4ea72110b1b632fc348da17827dbca9f.tar.gz
Refactor ApplyTableOperations
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp15
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.cpp11
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h55
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp14
4 files changed, 44 insertions, 51 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 30cf95a1ff..473d3fb74b 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1517,8 +1517,7 @@ private:
return PerformExecution(execQuery, ctx, cluster, settings.Mode, runFunc, finalizeFunc);
}
- bool ApplyTableOperations(const TString& cluster, const TVector<NKqpProto::TKqpTableOp>& tableOps,
- NKikimrKqp::EIsolationLevel isolationLevel, TExprContext& ctx)
+ std::pair<bool, TIssues> ApplyTableOperations(const TString& cluster, const TVector<NKqpProto::TKqpTableOp>& tableOps)
{
bool enableImmediateEffects = SessionCtx->Config().FeatureFlags.GetEnableKqpImmediateEffects();
auto queryType = SessionCtx->Query().Type;
@@ -1534,12 +1533,10 @@ private:
if (!SessionCtx->HasTx()) {
TKikimrTransactionContextBase emptyCtx;
- return emptyCtx.ApplyTableOperations(tableOps, tableInfo, isolationLevel, enableImmediateEffects,
- queryType, ctx);
+ return emptyCtx.ApplyTableOperations(tableOps, tableInfo, enableImmediateEffects, queryType);
}
- return SessionCtx->Tx().ApplyTableOperations(tableOps, tableInfo, isolationLevel,
- enableImmediateEffects, queryType, ctx);
+ return SessionCtx->Tx().ApplyTableOperations(tableOps, tableInfo, enableImmediateEffects, queryType);
}
bool ApplyDdlOperation(const TString& cluster, TPositionHandle pos, const TString& table,
@@ -1555,7 +1552,11 @@ private:
protoOp.SetTable(table);
protoOp.SetOperation((ui32)op);
- return ApplyTableOperations(cluster, {protoOp}, NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE, ctx);
+ auto [success, issues] = ApplyTableOperations(cluster, {protoOp});
+ for (auto& i : issues) {
+ ctx.AddError(std::move(i));
+ }
+ return success;
}
private:
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
index 763da848c7..4f576d1951 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
@@ -555,15 +555,18 @@ bool ValidateTableHasIndex(TKikimrTableMetadataPtr metadata, TExprContext& ctx,
}
bool AddDmlIssue(const TIssue& issue, TExprContext& ctx) {
+ auto newIssue = AddDmlIssue(issue);
+ ctx.AddError(newIssue);
+ return false;
+}
+
+TIssue AddDmlIssue(const TIssue& issue) {
TIssue newIssue;
newIssue.SetCode(issue.GetCode(), ESeverity::TSeverityIds_ESeverityId_S_ERROR);
newIssue.SetMessage("Detected violation of logical DML constraints. YDB transactions don't see their own"
" changes, make sure you perform all table reads before any modifications.");
-
newIssue.AddSubIssue(new TIssue(issue));
-
- ctx.AddError(newIssue);
- return false;
+ return newIssue;
}
TKiDataQueryBlockSettings TKiDataQueryBlockSettings::Parse(const NNodes::TKiDataQueryBlock& node) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 3024b13c81..0c9d4663ef 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -217,6 +217,7 @@ const TYdbOperations& KikimrDataOps();
const TYdbOperations& KikimrModifyOps();
const TYdbOperations& KikimrReadOps();
+TIssue AddDmlIssue(const TIssue& issue);
bool AddDmlIssue(const TIssue& issue, TExprContext& ctx);
class TKikimrTransactionContextBase : public TThrRefBase {
@@ -260,20 +261,16 @@ public:
}
template<class IterableKqpTableOps, class IterableKqpTableInfos>
- bool ApplyTableOperations(const IterableKqpTableOps& operations,
- const IterableKqpTableInfos& tableInfos, NKikimrKqp::EIsolationLevel isolationLevel,
- bool enableImmediateEffects, EKikimrQueryType queryType, TExprContext& ctx)
+ std::pair<bool, TIssues> ApplyTableOperations(const IterableKqpTableOps& operations,
+ const IterableKqpTableInfos& tableInfos, bool enableImmediateEffects, EKikimrQueryType queryType)
{
+ TIssues issues;
if (IsClosed()) {
TString message = TStringBuilder() << "Cannot perform operations on closed transaction.";
- ctx.AddError(YqlIssue({}, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
+ issues.AddIssue(YqlIssue({}, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return {false, issues};
}
- isolationLevel = EffectiveIsolationLevel
- ? *EffectiveIsolationLevel
- : isolationLevel;
-
bool hasScheme = false;
bool hasData = false;
for (auto& pair : TableOperations) {
@@ -299,50 +296,50 @@ public:
if (!info) {
TString message = TStringBuilder()
<< "Unable to find table info for table '" << table << "'";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message));
- return false;
+ issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message));
+ return {false, issues};
}
if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in data query";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
+ issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return {false, issues};
}
if (IsIn({EKikimrQueryType::Query, EKikimrQueryType::Script}, queryType) && (newOp & KikimrSchemeOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in query";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
+ issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return {false, issues};
}
if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in scheme query";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
+ issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return {false, issues};
}
if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in scan query";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
+ issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return {false, issues};
}
if (hasData && (newOp & KikimrSchemeOps()) ||
hasScheme && (newOp & KikimrDataOps()))
{
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX));
- return false;
+ issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX));
+ return {false, issues};
}
if (Readonly && (newOp & KikimrModifyOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in read only transaction";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
+ issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return {false, issues};
}
auto& currentOps = TableOperations[table];
@@ -351,22 +348,22 @@ public:
if (KikimrReadOps() & newOp) {
TString message = TStringBuilder() << "Data modifications previously made to table '" << table
<< "' in current transaction won't be seen by operation: '" << newOp << "'";
- if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message), ctx)) {
- return false;
- }
+ auto newIssue = AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message));
+ issues.AddIssue(newIssue);
+ return {false, issues};
}
if (info->GetHasIndexTables()) {
TString message = TStringBuilder() << "Multiple modification of table with secondary indexes is not supported yet";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
+ issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return {false, issues};
}
}
currentOps |= newOp;
}
- return true;
+ return {true, issues};
}
virtual ~TKikimrTransactionContextBase() = default;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index e0d916beaf..3d82733b32 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -569,16 +569,6 @@ public:
Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize());
}
- std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) {
- auto isolationLevel = *txCtx->EffectiveIsolationLevel;
- bool enableImmediateEffects = Config->FeatureFlags.GetEnableKqpImmediateEffects();
-
- TExprContext ctx;
- bool success = txCtx->ApplyTableOperations(query.GetTableOps(), query.GetTableInfos(), isolationLevel,
- enableImmediateEffects, EKikimrQueryType::Dml, ctx);
- return {success, ctx.IssueManager.GetIssues()};
- }
-
bool PrepareQueryTransaction() {
if (QueryState->HasTxControl()) {
const auto& txControl = QueryState->GetTxControl();
@@ -623,7 +613,9 @@ public:
}
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
- auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery);
+ bool enableImmediateEffects = Config->FeatureFlags.GetEnableKqpImmediateEffects();
+ auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
+ enableImmediateEffects, EKikimrQueryType::Dml);
if (!success) {
YQL_ENSURE(!issues.Empty());
ReplyQueryError(GetYdbStatus(issues), "", MessageFromIssues(issues));