diff options
author | mdartemenko <mdartemenko@yandex-team.com> | 2022-09-13 12:21:11 +0300 |
---|---|---|
committer | mdartemenko <mdartemenko@yandex-team.com> | 2022-09-13 12:21:11 +0300 |
commit | 8c1af103661148f5377e712cb2ac5623672522a1 (patch) | |
tree | 1cc99be5ead6fea4733e6160870db85b789adffb | |
parent | c4e553829de6b35770af8cf9bc10da708839a0fe (diff) | |
download | ydb-8c1af103661148f5377e712cb2ac5623672522a1.tar.gz |
change ApplyTableOperations params from vectors to templates
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.cpp | 127 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.h | 173 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider_impl.h | 6 |
6 files changed, 153 insertions, 170 deletions
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 063b2ba070a..ec37ab7f42c 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1823,7 +1823,7 @@ private: TExprContext& ctx) { // TODO: Use empty tx context - TIntrusivePtr<IKikimrTransactionContext> tempTxCtx = MakeIntrusive<TKqpTransactionContext>(true); + TIntrusivePtr<TKikimrTransactionContextBase> tempTxCtx = MakeIntrusive<TKqpTransactionContext>(true); IKikimrQueryExecutor::TExecuteSettings execSettings; execSettings.UseNewEngine = settings.UseNewEngine; SetupDataQueryAstTransformer(execSettings, tempTxCtx); @@ -2167,7 +2167,7 @@ private: SessionCtx, FillSettings, ExecuteCtx, nullptr, sqlVersion); } - void SetupSession(TIntrusivePtr<IKikimrTransactionContext> txCtx) { + void SetupSession(TIntrusivePtr<TKikimrTransactionContextBase> txCtx) { ExprCtx->Reset(); ExprCtx->Step.Done(TExprStep::ExprEval); // KIKIMR-8067 @@ -2183,7 +2183,7 @@ private: } } - void SetupYqlTransformer(TIntrusivePtr<IKikimrTransactionContext> txCtx) { + void SetupYqlTransformer(TIntrusivePtr<TKikimrTransactionContextBase> txCtx) { SetupSession(txCtx); YqlTransformer->Rewind(); @@ -2193,7 +2193,7 @@ private: } void SetupDataQueryAstTransformer(const IKikimrQueryExecutor::TExecuteSettings& settings, - TIntrusivePtr<IKikimrTransactionContext> txCtx = {}) + TIntrusivePtr<TKikimrTransactionContextBase> txCtx = {}) { SetupSession(txCtx); @@ -2203,7 +2203,7 @@ private: } void SetupExecutePreparedTransformer(const IKikimrQueryExecutor::TExecuteSettings& settings, - TIntrusivePtr<IKikimrTransactionContext> txCtx) + TIntrusivePtr<TKikimrTransactionContextBase> txCtx) { SetupSession(txCtx); diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h index 51226338b30..d000655c9da 100644 --- a/ydb/core/kqp/host/kqp_host_impl.h +++ b/ydb/core/kqp/host/kqp_host_impl.h @@ -229,7 +229,7 @@ template<typename TResult, bool copyIssues = true> class TKqpAsyncExecuteResultBase : public TKqpAsyncResultBase<TResult, copyIssues> { public: TKqpAsyncExecuteResultBase(const NYql::TExprNode::TPtr& exprRoot, NYql::TExprContext& exprCtx, - NYql::IGraphTransformer& transformer, TIntrusivePtr<NYql::IKikimrTransactionContext> txCtx) + NYql::IGraphTransformer& transformer, TIntrusivePtr<NYql::TKikimrTransactionContextBase> txCtx) : TKqpAsyncResultBase<TResult, copyIssues>(exprRoot, exprCtx, transformer, [txCtx](const auto& status) { if (txCtx) { diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index abdb105a5c7..c18ffb02f53 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -676,14 +676,11 @@ public: } std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) { - TVector<NKqpProto::TKqpTableOp> operations(query.GetTableOps().begin(), query.GetTableOps().end()); - TVector<NKqpProto::TKqpTableInfo> tableInfos(query.GetTableInfos().begin(), query.GetTableInfos().end()); - auto isolationLevel = *txCtx->EffectiveIsolationLevel; bool strictDml = Config->StrictDml.Get(Settings.Cluster).GetOrElse(false); TExprContext ctx; - bool success = txCtx->ApplyTableOperations(operations, tableInfos, isolationLevel, strictDml, EKikimrQueryType::Dml, ctx); + bool success = txCtx->ApplyTableOperations(query.GetTableOps(), query.GetTableInfos(), isolationLevel, strictDml, EKikimrQueryType::Dml, ctx); return {success, ctx.IssueManager.GetIssues()}; } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index eb492ccef0c..59d155eb9dd 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -569,133 +569,6 @@ void TableDescriptionToTableInfo(const TKikimrTableDescription& desc, TYdbOperat TableDescriptionToTableInfoImpl(desc, op, std::back_inserter(infos)); } -bool TKikimrTransactionContextBase::ApplyTableOperations(const TVector<NKqpProto::TKqpTableOp>& operations, - const TVector<NKqpProto::TKqpTableInfo>& tableInfos, NKikimrKqp::EIsolationLevel isolationLevel, bool strictDml, - EKikimrQueryType queryType, TExprContext& ctx) -{ - if (IsClosed()) { - TString message = TStringBuilder() << "Cannot perform operations on closed transaction."; - ctx.AddError(YqlIssue({}, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; - } - - isolationLevel = EffectiveIsolationLevel - ? *EffectiveIsolationLevel - : isolationLevel; - - bool hasScheme = false; - bool hasData = false; - for (auto& pair : TableOperations) { - hasScheme = hasScheme || (pair.second & KikimrSchemeOps()); - hasData = hasData || (pair.second & KikimrDataOps()); - } - - THashMap<TString, NKqpProto::TKqpTableInfo> tableInfoMap; - for (const auto& info : tableInfos) { - tableInfoMap.insert(std::make_pair(info.GetTableName(), info)); - - TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId()); - TableByIdMap.insert(std::make_pair(pathId, info.GetTableName())); - } - - for (const auto& op : operations) { - const auto& table = op.GetTable(); - - auto newOp = TYdbOperation(op.GetOperation()); - TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); - - const auto info = tableInfoMap.FindPtr(table); - if (!info) { - TString message = TStringBuilder() - << "Unable to find table info for table '" << table << "'"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message)); - return false; - } - - if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) { - TString message = TStringBuilder() << "Operation '" << newOp - << "' can't be performed in data query"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; - } - - if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) { - TString message = TStringBuilder() << "Operation '" << newOp - << "' can't be performed in scheme query"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; - } - - if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) { - TString message = TStringBuilder() << "Operation '" << newOp - << "' can't be performed in scan query"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; - } - - if (hasData && (newOp & KikimrSchemeOps()) || - hasScheme && (newOp & KikimrDataOps())) - { - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX)); - return false; - } - - if (Readonly && (newOp & KikimrModifyOps())) { - TString message = TStringBuilder() << "Operation '" << newOp - << "' can't be performed in read only transaction"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; - } - - auto& currentOps = TableOperations[table]; - - if (currentOps & KikimrModifyOps()) { - if (KikimrRequireUnmodifiedOps() & newOp) { - TString message = TStringBuilder() << "Operation '" << newOp - << "' can't be performed on previously modified table: " << table; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; - } - - if (KikimrReadOps() & newOp) { - TString message = TStringBuilder() << "Data modifications previously made to table '" << table - << "' in current transaction won't be seen by operation: '" << newOp << "'"; - if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message), strictDml, ctx)) { - return false; - } - } - - if (info->GetHasIndexTables()) { - TString message = TStringBuilder() << "Multiple modification of table with secondary indexes is not supported yet"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; - } - } - - if ((KikimrRequireUnmodifiedOps() & newOp) && isolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { - TString message = TStringBuilder() - << "Operation '" << newOp << "' is only supported with SERIALIZABLE isolation level"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; - } - - // TODO: KIKIMR-3206 - bool currentDelete = currentOps & (TYdbOperation::Delete | TYdbOperation::DeleteOn); - bool newUpdate = newOp == TYdbOperation::Update; - if (currentDelete && newUpdate) { - TString message = TStringBuilder() << "Operation '" << newOp - << "' may lead to unexpected results when applied to table with deleted rows: " << table; - if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_UPDATE_TABLE_WITH_DELETES, message), strictDml, ctx)) { - return false; - } - } - - currentOps |= newOp; - } - - return true; -} - const THashSet<TStringBuf>& KikimrDataSourceFunctions() { return Singleton<TKikimrData>()->DataSourceNames; } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 523387bbf38..36ce2160110 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -260,22 +260,15 @@ enum class TYdbOperation : ui32 { Y_DECLARE_FLAGS(TYdbOperations, TYdbOperation) Y_DECLARE_OPERATORS_FOR_FLAGS(TYdbOperations) -class IKikimrTransactionContext : public TThrRefBase { -public: - virtual void Invalidate() = 0; - virtual void Finish() = 0; - - virtual bool IsInvalidated() const = 0; - virtual bool IsClosed() const = 0; - - virtual bool ApplyTableOperations(const TVector<NKqpProto::TKqpTableOp>& operations, - const TVector<NKqpProto::TKqpTableInfo>& tableInfo, NKikimrKqp::EIsolationLevel isolationLevel, - bool strictDml, EKikimrQueryType queryType, TExprContext& ctx) = 0; +const TYdbOperations& KikimrSchemeOps(); +const TYdbOperations& KikimrDataOps(); +const TYdbOperations& KikimrModifyOps(); +const TYdbOperations& KikimrReadOps(); +const TYdbOperations& KikimrRequireUnmodifiedOps(); - virtual ~IKikimrTransactionContext() = default; -}; +bool AddDmlIssue(const TIssue& issue, bool strictDml, TExprContext& ctx); -class TKikimrTransactionContextBase : public IKikimrTransactionContext { +class TKikimrTransactionContextBase : public TThrRefBase { public: THashMap<TString, TYdbOperations> TableOperations; THashMap<TKikimrPathId, TString> TableByIdMap; @@ -288,19 +281,19 @@ public: return EffectiveIsolationLevel.Defined(); } - bool IsInvalidated() const override { + bool IsInvalidated() const { return Invalidated; } - bool IsClosed() const override { + bool IsClosed() const { return Closed; } - void Finish() override { + virtual void Finish() { Closed = true; } - void Invalidate() override { + void Invalidate() { if (HasStarted()) { Invalidated = true; } @@ -315,13 +308,141 @@ public: Closed = false; } - bool ApplyTableOperations(const TVector<NKqpProto::TKqpTableOp>& operations, const TVector<NKqpProto::TKqpTableInfo>& tableInfo, - NKikimrKqp::EIsolationLevel isolationLevel, bool strictDml, EKikimrQueryType queryType, TExprContext& ctx) override; + template<class IterableKqpTableOps, class IterableKqpTableInfos> + bool ApplyTableOperations(const IterableKqpTableOps& operations, + const IterableKqpTableInfos& tableInfos, NKikimrKqp::EIsolationLevel isolationLevel, bool strictDml, + EKikimrQueryType queryType, TExprContext& ctx) + { + if (IsClosed()) { + TString message = TStringBuilder() << "Cannot perform operations on closed transaction."; + ctx.AddError(YqlIssue({}, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return false; + } + + isolationLevel = EffectiveIsolationLevel + ? *EffectiveIsolationLevel + : isolationLevel; + + bool hasScheme = false; + bool hasData = false; + for (auto& pair : TableOperations) { + hasScheme = hasScheme || (pair.second & KikimrSchemeOps()); + hasData = hasData || (pair.second & KikimrDataOps()); + } + + THashMap<TString, NKqpProto::TKqpTableInfo> tableInfoMap; + for (const auto& info : tableInfos) { + tableInfoMap.insert(std::make_pair(info.GetTableName(), info)); + + TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId()); + TableByIdMap.insert(std::make_pair(pathId, info.GetTableName())); + } + + for (const auto& op : operations) { + const auto& table = op.GetTable(); + + auto newOp = TYdbOperation(op.GetOperation()); + TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); + + const auto info = tableInfoMap.FindPtr(table); + if (!info) { + TString message = TStringBuilder() + << "Unable to find table info for table '" << table << "'"; + ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message)); + return false; + } + + if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) { + TString message = TStringBuilder() << "Operation '" << newOp + << "' can't be performed in data query"; + ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return false; + } + + if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) { + TString message = TStringBuilder() << "Operation '" << newOp + << "' can't be performed in scheme query"; + ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return false; + } + + if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) { + TString message = TStringBuilder() << "Operation '" << newOp + << "' can't be performed in scan query"; + ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return false; + } + + if (hasData && (newOp & KikimrSchemeOps()) || + hasScheme && (newOp & KikimrDataOps())) + { + ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX)); + return false; + } + + if (Readonly && (newOp & KikimrModifyOps())) { + TString message = TStringBuilder() << "Operation '" << newOp + << "' can't be performed in read only transaction"; + ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return false; + } + + auto& currentOps = TableOperations[table]; + + if (currentOps & KikimrModifyOps()) { + if (KikimrRequireUnmodifiedOps() & newOp) { + TString message = TStringBuilder() << "Operation '" << newOp + << "' can't be performed on previously modified table: " << table; + ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return false; + } + + if (KikimrReadOps() & newOp) { + TString message = TStringBuilder() << "Data modifications previously made to table '" << table + << "' in current transaction won't be seen by operation: '" << newOp << "'"; + if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message), strictDml, ctx)) { + return false; + } + } + + if (info->GetHasIndexTables()) { + TString message = TStringBuilder() << "Multiple modification of table with secondary indexes is not supported yet"; + ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return false; + } + } + + if ((KikimrRequireUnmodifiedOps() & newOp) && isolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { + TString message = TStringBuilder() + << "Operation '" << newOp << "' is only supported with SERIALIZABLE isolation level"; + ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return false; + } + + // TODO: KIKIMR-3206 + bool currentDelete = currentOps & (TYdbOperation::Delete | TYdbOperation::DeleteOn); + bool newUpdate = newOp == TYdbOperation::Update; + if (currentDelete && newUpdate) { + TString message = TStringBuilder() << "Operation '" << newOp + << "' may lead to unexpected results when applied to table with deleted rows: " << table; + if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_UPDATE_TABLE_WITH_DELETES, message), strictDml, ctx)) { + return false; + } + } + + currentOps |= newOp; + } + + return true; + } + + virtual ~TKikimrTransactionContextBase() = default; + }; class TKikimrSessionContext : public TThrRefBase { public: - TKikimrSessionContext(TKikimrConfiguration::TPtr config, TIntrusivePtr<IKikimrTransactionContext> txCtx = nullptr) + TKikimrSessionContext(TKikimrConfiguration::TPtr config, TIntrusivePtr<TKikimrTransactionContextBase> txCtx = nullptr) : Configuration(config) , TablesData(MakeIntrusive<TKikimrTablesData>()) , QueryCtx(MakeIntrusive<TKikimrQueryContext>()) @@ -333,16 +454,16 @@ public: TKikimrConfiguration& Config() { return *Configuration; } TKikimrTablesData& Tables() { return *TablesData; } TKikimrQueryContext& Query() { return *QueryCtx; } - IKikimrTransactionContext& Tx() { Y_VERIFY(HasTx()); return *TxCtx; } + TKikimrTransactionContextBase& Tx() { Y_VERIFY(HasTx()); return *TxCtx; } TKikimrConfiguration::TPtr ConfigPtr() { return Configuration; } TIntrusivePtr<TKikimrTablesData> TablesPtr() { return TablesData; } TIntrusivePtr<TKikimrQueryContext> QueryPtr() { return QueryCtx; } - TIntrusivePtr<IKikimrTransactionContext> TxPtr() { return TxCtx; } + TIntrusivePtr<TKikimrTransactionContextBase> TxPtr() { return TxCtx; } bool HasTx() const { return !!TxCtx; } void ClearTx() { TxCtx.Reset(); } - void SetTx(TIntrusivePtr<IKikimrTransactionContext>& txCtx) { TxCtx.Reset(txCtx); } + void SetTx(TIntrusivePtr<TKikimrTransactionContextBase>& txCtx) { TxCtx.Reset(txCtx); } TString GetUserName() const { return UserName; @@ -376,11 +497,9 @@ private: TKikimrConfiguration::TPtr Configuration; TIntrusivePtr<TKikimrTablesData> TablesData; TIntrusivePtr<TKikimrQueryContext> QueryCtx; - TIntrusivePtr<IKikimrTransactionContext> TxCtx; + TIntrusivePtr<TKikimrTransactionContextBase> TxCtx; }; -bool AddDmlIssue(const TIssue& issue, bool strictDml, TExprContext& ctx); - TIntrusivePtr<IDataProvider> CreateKikimrDataSource( const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, TTypeAnnotationContext& types, diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index 43e710002e5..db334d30618 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -272,12 +272,6 @@ const TStringBuf& KikimrCommitModeFlush(); const TStringBuf& KikimrCommitModeRollback(); const TStringBuf& KikimrCommitModeScheme(); -const TYdbOperations& KikimrSchemeOps(); -const TYdbOperations& KikimrDataOps(); -const TYdbOperations& KikimrModifyOps(); -const TYdbOperations& KikimrReadOps(); -const TYdbOperations& KikimrRequireUnmodifiedOps(); - const TMap<TString, NKikimr::NUdf::EDataSlot>& KikimrSystemColumns(); bool IsKikimrSystemColumn(const TStringBuf columnName); |