aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormdartemenko <mdartemenko@yandex-team.com>2022-09-13 12:21:11 +0300
committermdartemenko <mdartemenko@yandex-team.com>2022-09-13 12:21:11 +0300
commit8c1af103661148f5377e712cb2ac5623672522a1 (patch)
tree1cc99be5ead6fea4733e6160870db85b789adffb
parentc4e553829de6b35770af8cf9bc10da708839a0fe (diff)
downloadydb-8c1af103661148f5377e712cb2ac5623672522a1.tar.gz
change ApplyTableOperations params from vectors to templates
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp10
-rw-r--r--ydb/core/kqp/host/kqp_host_impl.h2
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.cpp127
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h173
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider_impl.h6
6 files changed, 153 insertions, 170 deletions
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 063b2ba070a..ec37ab7f42c 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1823,7 +1823,7 @@ private:
TExprContext& ctx)
{
// TODO: Use empty tx context
- TIntrusivePtr<IKikimrTransactionContext> tempTxCtx = MakeIntrusive<TKqpTransactionContext>(true);
+ TIntrusivePtr<TKikimrTransactionContextBase> tempTxCtx = MakeIntrusive<TKqpTransactionContext>(true);
IKikimrQueryExecutor::TExecuteSettings execSettings;
execSettings.UseNewEngine = settings.UseNewEngine;
SetupDataQueryAstTransformer(execSettings, tempTxCtx);
@@ -2167,7 +2167,7 @@ private:
SessionCtx, FillSettings, ExecuteCtx, nullptr, sqlVersion);
}
- void SetupSession(TIntrusivePtr<IKikimrTransactionContext> txCtx) {
+ void SetupSession(TIntrusivePtr<TKikimrTransactionContextBase> txCtx) {
ExprCtx->Reset();
ExprCtx->Step.Done(TExprStep::ExprEval); // KIKIMR-8067
@@ -2183,7 +2183,7 @@ private:
}
}
- void SetupYqlTransformer(TIntrusivePtr<IKikimrTransactionContext> txCtx) {
+ void SetupYqlTransformer(TIntrusivePtr<TKikimrTransactionContextBase> txCtx) {
SetupSession(txCtx);
YqlTransformer->Rewind();
@@ -2193,7 +2193,7 @@ private:
}
void SetupDataQueryAstTransformer(const IKikimrQueryExecutor::TExecuteSettings& settings,
- TIntrusivePtr<IKikimrTransactionContext> txCtx = {})
+ TIntrusivePtr<TKikimrTransactionContextBase> txCtx = {})
{
SetupSession(txCtx);
@@ -2203,7 +2203,7 @@ private:
}
void SetupExecutePreparedTransformer(const IKikimrQueryExecutor::TExecuteSettings& settings,
- TIntrusivePtr<IKikimrTransactionContext> txCtx)
+ TIntrusivePtr<TKikimrTransactionContextBase> txCtx)
{
SetupSession(txCtx);
diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h
index 51226338b30..d000655c9da 100644
--- a/ydb/core/kqp/host/kqp_host_impl.h
+++ b/ydb/core/kqp/host/kqp_host_impl.h
@@ -229,7 +229,7 @@ template<typename TResult, bool copyIssues = true>
class TKqpAsyncExecuteResultBase : public TKqpAsyncResultBase<TResult, copyIssues> {
public:
TKqpAsyncExecuteResultBase(const NYql::TExprNode::TPtr& exprRoot, NYql::TExprContext& exprCtx,
- NYql::IGraphTransformer& transformer, TIntrusivePtr<NYql::IKikimrTransactionContext> txCtx)
+ NYql::IGraphTransformer& transformer, TIntrusivePtr<NYql::TKikimrTransactionContextBase> txCtx)
: TKqpAsyncResultBase<TResult, copyIssues>(exprRoot, exprCtx, transformer,
[txCtx](const auto& status) {
if (txCtx) {
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index abdb105a5c7..c18ffb02f53 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -676,14 +676,11 @@ public:
}
std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) {
- TVector<NKqpProto::TKqpTableOp> operations(query.GetTableOps().begin(), query.GetTableOps().end());
- TVector<NKqpProto::TKqpTableInfo> tableInfos(query.GetTableInfos().begin(), query.GetTableInfos().end());
-
auto isolationLevel = *txCtx->EffectiveIsolationLevel;
bool strictDml = Config->StrictDml.Get(Settings.Cluster).GetOrElse(false);
TExprContext ctx;
- bool success = txCtx->ApplyTableOperations(operations, tableInfos, isolationLevel, strictDml, EKikimrQueryType::Dml, ctx);
+ bool success = txCtx->ApplyTableOperations(query.GetTableOps(), query.GetTableInfos(), isolationLevel, strictDml, EKikimrQueryType::Dml, ctx);
return {success, ctx.IssueManager.GetIssues()};
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
index eb492ccef0c..59d155eb9dd 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
@@ -569,133 +569,6 @@ void TableDescriptionToTableInfo(const TKikimrTableDescription& desc, TYdbOperat
TableDescriptionToTableInfoImpl(desc, op, std::back_inserter(infos));
}
-bool TKikimrTransactionContextBase::ApplyTableOperations(const TVector<NKqpProto::TKqpTableOp>& operations,
- const TVector<NKqpProto::TKqpTableInfo>& tableInfos, NKikimrKqp::EIsolationLevel isolationLevel, bool strictDml,
- EKikimrQueryType queryType, TExprContext& ctx)
-{
- if (IsClosed()) {
- TString message = TStringBuilder() << "Cannot perform operations on closed transaction.";
- ctx.AddError(YqlIssue({}, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
- }
-
- isolationLevel = EffectiveIsolationLevel
- ? *EffectiveIsolationLevel
- : isolationLevel;
-
- bool hasScheme = false;
- bool hasData = false;
- for (auto& pair : TableOperations) {
- hasScheme = hasScheme || (pair.second & KikimrSchemeOps());
- hasData = hasData || (pair.second & KikimrDataOps());
- }
-
- THashMap<TString, NKqpProto::TKqpTableInfo> tableInfoMap;
- for (const auto& info : tableInfos) {
- tableInfoMap.insert(std::make_pair(info.GetTableName(), info));
-
- TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
- TableByIdMap.insert(std::make_pair(pathId, info.GetTableName()));
- }
-
- for (const auto& op : operations) {
- const auto& table = op.GetTable();
-
- auto newOp = TYdbOperation(op.GetOperation());
- TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
-
- const auto info = tableInfoMap.FindPtr(table);
- if (!info) {
- TString message = TStringBuilder()
- << "Unable to find table info for table '" << table << "'";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message));
- return false;
- }
-
- if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) {
- TString message = TStringBuilder() << "Operation '" << newOp
- << "' can't be performed in data query";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
- }
-
- if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) {
- TString message = TStringBuilder() << "Operation '" << newOp
- << "' can't be performed in scheme query";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
- }
-
- if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) {
- TString message = TStringBuilder() << "Operation '" << newOp
- << "' can't be performed in scan query";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
- }
-
- if (hasData && (newOp & KikimrSchemeOps()) ||
- hasScheme && (newOp & KikimrDataOps()))
- {
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX));
- return false;
- }
-
- if (Readonly && (newOp & KikimrModifyOps())) {
- TString message = TStringBuilder() << "Operation '" << newOp
- << "' can't be performed in read only transaction";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
- }
-
- auto& currentOps = TableOperations[table];
-
- if (currentOps & KikimrModifyOps()) {
- if (KikimrRequireUnmodifiedOps() & newOp) {
- TString message = TStringBuilder() << "Operation '" << newOp
- << "' can't be performed on previously modified table: " << table;
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
- }
-
- if (KikimrReadOps() & newOp) {
- TString message = TStringBuilder() << "Data modifications previously made to table '" << table
- << "' in current transaction won't be seen by operation: '" << newOp << "'";
- if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message), strictDml, ctx)) {
- return false;
- }
- }
-
- if (info->GetHasIndexTables()) {
- TString message = TStringBuilder() << "Multiple modification of table with secondary indexes is not supported yet";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
- }
- }
-
- if ((KikimrRequireUnmodifiedOps() & newOp) && isolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
- TString message = TStringBuilder()
- << "Operation '" << newOp << "' is only supported with SERIALIZABLE isolation level";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
- }
-
- // TODO: KIKIMR-3206
- bool currentDelete = currentOps & (TYdbOperation::Delete | TYdbOperation::DeleteOn);
- bool newUpdate = newOp == TYdbOperation::Update;
- if (currentDelete && newUpdate) {
- TString message = TStringBuilder() << "Operation '" << newOp
- << "' may lead to unexpected results when applied to table with deleted rows: " << table;
- if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_UPDATE_TABLE_WITH_DELETES, message), strictDml, ctx)) {
- return false;
- }
- }
-
- currentOps |= newOp;
- }
-
- return true;
-}
-
const THashSet<TStringBuf>& KikimrDataSourceFunctions() {
return Singleton<TKikimrData>()->DataSourceNames;
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 523387bbf38..36ce2160110 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -260,22 +260,15 @@ enum class TYdbOperation : ui32 {
Y_DECLARE_FLAGS(TYdbOperations, TYdbOperation)
Y_DECLARE_OPERATORS_FOR_FLAGS(TYdbOperations)
-class IKikimrTransactionContext : public TThrRefBase {
-public:
- virtual void Invalidate() = 0;
- virtual void Finish() = 0;
-
- virtual bool IsInvalidated() const = 0;
- virtual bool IsClosed() const = 0;
-
- virtual bool ApplyTableOperations(const TVector<NKqpProto::TKqpTableOp>& operations,
- const TVector<NKqpProto::TKqpTableInfo>& tableInfo, NKikimrKqp::EIsolationLevel isolationLevel,
- bool strictDml, EKikimrQueryType queryType, TExprContext& ctx) = 0;
+const TYdbOperations& KikimrSchemeOps();
+const TYdbOperations& KikimrDataOps();
+const TYdbOperations& KikimrModifyOps();
+const TYdbOperations& KikimrReadOps();
+const TYdbOperations& KikimrRequireUnmodifiedOps();
- virtual ~IKikimrTransactionContext() = default;
-};
+bool AddDmlIssue(const TIssue& issue, bool strictDml, TExprContext& ctx);
-class TKikimrTransactionContextBase : public IKikimrTransactionContext {
+class TKikimrTransactionContextBase : public TThrRefBase {
public:
THashMap<TString, TYdbOperations> TableOperations;
THashMap<TKikimrPathId, TString> TableByIdMap;
@@ -288,19 +281,19 @@ public:
return EffectiveIsolationLevel.Defined();
}
- bool IsInvalidated() const override {
+ bool IsInvalidated() const {
return Invalidated;
}
- bool IsClosed() const override {
+ bool IsClosed() const {
return Closed;
}
- void Finish() override {
+ virtual void Finish() {
Closed = true;
}
- void Invalidate() override {
+ void Invalidate() {
if (HasStarted()) {
Invalidated = true;
}
@@ -315,13 +308,141 @@ public:
Closed = false;
}
- bool ApplyTableOperations(const TVector<NKqpProto::TKqpTableOp>& operations, const TVector<NKqpProto::TKqpTableInfo>& tableInfo,
- NKikimrKqp::EIsolationLevel isolationLevel, bool strictDml, EKikimrQueryType queryType, TExprContext& ctx) override;
+ template<class IterableKqpTableOps, class IterableKqpTableInfos>
+ bool ApplyTableOperations(const IterableKqpTableOps& operations,
+ const IterableKqpTableInfos& tableInfos, NKikimrKqp::EIsolationLevel isolationLevel, bool strictDml,
+ EKikimrQueryType queryType, TExprContext& ctx)
+ {
+ if (IsClosed()) {
+ TString message = TStringBuilder() << "Cannot perform operations on closed transaction.";
+ ctx.AddError(YqlIssue({}, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return false;
+ }
+
+ isolationLevel = EffectiveIsolationLevel
+ ? *EffectiveIsolationLevel
+ : isolationLevel;
+
+ bool hasScheme = false;
+ bool hasData = false;
+ for (auto& pair : TableOperations) {
+ hasScheme = hasScheme || (pair.second & KikimrSchemeOps());
+ hasData = hasData || (pair.second & KikimrDataOps());
+ }
+
+ THashMap<TString, NKqpProto::TKqpTableInfo> tableInfoMap;
+ for (const auto& info : tableInfos) {
+ tableInfoMap.insert(std::make_pair(info.GetTableName(), info));
+
+ TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
+ TableByIdMap.insert(std::make_pair(pathId, info.GetTableName()));
+ }
+
+ for (const auto& op : operations) {
+ const auto& table = op.GetTable();
+
+ auto newOp = TYdbOperation(op.GetOperation());
+ TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
+
+ const auto info = tableInfoMap.FindPtr(table);
+ if (!info) {
+ TString message = TStringBuilder()
+ << "Unable to find table info for table '" << table << "'";
+ ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message));
+ return false;
+ }
+
+ if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) {
+ TString message = TStringBuilder() << "Operation '" << newOp
+ << "' can't be performed in data query";
+ ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return false;
+ }
+
+ if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) {
+ TString message = TStringBuilder() << "Operation '" << newOp
+ << "' can't be performed in scheme query";
+ ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return false;
+ }
+
+ if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) {
+ TString message = TStringBuilder() << "Operation '" << newOp
+ << "' can't be performed in scan query";
+ ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return false;
+ }
+
+ if (hasData && (newOp & KikimrSchemeOps()) ||
+ hasScheme && (newOp & KikimrDataOps()))
+ {
+ ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX));
+ return false;
+ }
+
+ if (Readonly && (newOp & KikimrModifyOps())) {
+ TString message = TStringBuilder() << "Operation '" << newOp
+ << "' can't be performed in read only transaction";
+ ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return false;
+ }
+
+ auto& currentOps = TableOperations[table];
+
+ if (currentOps & KikimrModifyOps()) {
+ if (KikimrRequireUnmodifiedOps() & newOp) {
+ TString message = TStringBuilder() << "Operation '" << newOp
+ << "' can't be performed on previously modified table: " << table;
+ ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return false;
+ }
+
+ if (KikimrReadOps() & newOp) {
+ TString message = TStringBuilder() << "Data modifications previously made to table '" << table
+ << "' in current transaction won't be seen by operation: '" << newOp << "'";
+ if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message), strictDml, ctx)) {
+ return false;
+ }
+ }
+
+ if (info->GetHasIndexTables()) {
+ TString message = TStringBuilder() << "Multiple modification of table with secondary indexes is not supported yet";
+ ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return false;
+ }
+ }
+
+ if ((KikimrRequireUnmodifiedOps() & newOp) && isolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
+ TString message = TStringBuilder()
+ << "Operation '" << newOp << "' is only supported with SERIALIZABLE isolation level";
+ ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return false;
+ }
+
+ // TODO: KIKIMR-3206
+ bool currentDelete = currentOps & (TYdbOperation::Delete | TYdbOperation::DeleteOn);
+ bool newUpdate = newOp == TYdbOperation::Update;
+ if (currentDelete && newUpdate) {
+ TString message = TStringBuilder() << "Operation '" << newOp
+ << "' may lead to unexpected results when applied to table with deleted rows: " << table;
+ if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_UPDATE_TABLE_WITH_DELETES, message), strictDml, ctx)) {
+ return false;
+ }
+ }
+
+ currentOps |= newOp;
+ }
+
+ return true;
+ }
+
+ virtual ~TKikimrTransactionContextBase() = default;
+
};
class TKikimrSessionContext : public TThrRefBase {
public:
- TKikimrSessionContext(TKikimrConfiguration::TPtr config, TIntrusivePtr<IKikimrTransactionContext> txCtx = nullptr)
+ TKikimrSessionContext(TKikimrConfiguration::TPtr config, TIntrusivePtr<TKikimrTransactionContextBase> txCtx = nullptr)
: Configuration(config)
, TablesData(MakeIntrusive<TKikimrTablesData>())
, QueryCtx(MakeIntrusive<TKikimrQueryContext>())
@@ -333,16 +454,16 @@ public:
TKikimrConfiguration& Config() { return *Configuration; }
TKikimrTablesData& Tables() { return *TablesData; }
TKikimrQueryContext& Query() { return *QueryCtx; }
- IKikimrTransactionContext& Tx() { Y_VERIFY(HasTx()); return *TxCtx; }
+ TKikimrTransactionContextBase& Tx() { Y_VERIFY(HasTx()); return *TxCtx; }
TKikimrConfiguration::TPtr ConfigPtr() { return Configuration; }
TIntrusivePtr<TKikimrTablesData> TablesPtr() { return TablesData; }
TIntrusivePtr<TKikimrQueryContext> QueryPtr() { return QueryCtx; }
- TIntrusivePtr<IKikimrTransactionContext> TxPtr() { return TxCtx; }
+ TIntrusivePtr<TKikimrTransactionContextBase> TxPtr() { return TxCtx; }
bool HasTx() const { return !!TxCtx; }
void ClearTx() { TxCtx.Reset(); }
- void SetTx(TIntrusivePtr<IKikimrTransactionContext>& txCtx) { TxCtx.Reset(txCtx); }
+ void SetTx(TIntrusivePtr<TKikimrTransactionContextBase>& txCtx) { TxCtx.Reset(txCtx); }
TString GetUserName() const {
return UserName;
@@ -376,11 +497,9 @@ private:
TKikimrConfiguration::TPtr Configuration;
TIntrusivePtr<TKikimrTablesData> TablesData;
TIntrusivePtr<TKikimrQueryContext> QueryCtx;
- TIntrusivePtr<IKikimrTransactionContext> TxCtx;
+ TIntrusivePtr<TKikimrTransactionContextBase> TxCtx;
};
-bool AddDmlIssue(const TIssue& issue, bool strictDml, TExprContext& ctx);
-
TIntrusivePtr<IDataProvider> CreateKikimrDataSource(
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
TTypeAnnotationContext& types,
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
index 43e710002e5..db334d30618 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
@@ -272,12 +272,6 @@ const TStringBuf& KikimrCommitModeFlush();
const TStringBuf& KikimrCommitModeRollback();
const TStringBuf& KikimrCommitModeScheme();
-const TYdbOperations& KikimrSchemeOps();
-const TYdbOperations& KikimrDataOps();
-const TYdbOperations& KikimrModifyOps();
-const TYdbOperations& KikimrReadOps();
-const TYdbOperations& KikimrRequireUnmodifiedOps();
-
const TMap<TString, NKikimr::NUdf::EDataSlot>& KikimrSystemColumns();
bool IsKikimrSystemColumn(const TStringBuf columnName);