diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-23 17:17:54 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-23 17:17:54 +0300 |
commit | af6d4963aeec3543ef895865a0a24fa2e039efa3 (patch) | |
tree | a3583500436e8938465aa53214593846480a9ac2 | |
parent | 7eed70475de2ad09d3549efa64e50ed7e0e7d63a (diff) | |
download | ydb-af6d4963aeec3543ef895865a0a24fa2e039efa3.tar.gz |
Enable pattern cache in DataShard
9 files changed, 61 insertions, 45 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index df8a320058..6c13bebfc1 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -5,6 +5,7 @@ #include "sys_tables.h" #include <ydb/core/engine/minikql/minikql_engine_host.h> +#include <ydb/core/kqp/rm_service/kqp_rm_service.h> #include <ydb/core/kqp/runtime/kqp_compute.h> #include <ydb/core/scheme/scheme_tablecell.h> #include <ydb/core/tx/datashard/range_ops.h> @@ -856,16 +857,17 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor ComputeCtx = MakeHolder<TKqpDatashardComputeContext>(self, *EngineHost, now); ComputeCtx->Database = &txc.DB; + KqpAlloc = MakeHolder<TScopedAlloc>(__LOCATION__, TAlignedPagePoolCounters(), AppData(ctx)->FunctionRegistry->SupportsSizedAllocators()); + KqpTypeEnv = MakeHolder<TTypeEnvironment>(*KqpAlloc); + KqpAlloc->Release(); + auto kqpApplyCtx = MakeHolder<TKqpDatashardApplyContext>(); kqpApplyCtx->Host = EngineHost.Get(); kqpApplyCtx->ShardTableStats = &ComputeCtx->GetDatashardCounters(); + kqpApplyCtx->Env = KqpTypeEnv.Get(); KqpApplyCtx.Reset(kqpApplyCtx.Release()); - KqpAlloc = MakeHolder<TScopedAlloc>(__LOCATION__, TAlignedPagePoolCounters(), AppData(ctx)->FunctionRegistry->SupportsSizedAllocators()); - KqpTypeEnv = MakeHolder<TTypeEnvironment>(*KqpAlloc); - KqpAlloc->Release(); - KqpExecCtx.FuncRegistry = AppData(ctx)->FunctionRegistry; KqpExecCtx.ComputeCtx = ComputeCtx.Get(); KqpExecCtx.ComputationFactory = GetKqpDatashardComputeFactory(ComputeCtx.Get()); @@ -874,6 +876,9 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor KqpExecCtx.ApplyCtx = KqpApplyCtx.Get(); KqpExecCtx.Alloc = KqpAlloc.Get(); KqpExecCtx.TypeEnv = KqpTypeEnv.Get(); + if (auto* rm = NKqp::TryGetKqpResourceManager()) { + KqpExecCtx.PatternCache = rm->GetPatternCache(); + } } TEngineBay::~TEngineBay() { diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h index 0287e896af..d482af0b9e 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.h +++ b/ydb/core/tx/datashard/datashard_kqp_compute.h @@ -124,6 +124,7 @@ public: IEngineFlatHost* Host = nullptr; TKqpTableStats* ShardTableStats = nullptr; TKqpTableStats* TaskTableStats = nullptr; + TTypeEnvironment* Env = nullptr; }; IComputationNode* WrapKqpWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx, diff --git a/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp b/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp index 6bd478dcd6..3d076e1c67 100644 --- a/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp @@ -38,7 +38,7 @@ public: auto& engineCtx = *CheckedCast<TKqpDatashardApplyContext*>(&applyContext); TVector<TCell> keyTuple(Owner.KeyIndices.size()); - FillKeyTupleValue(Row, Owner.KeyIndices, Owner.RowTypes, keyTuple, Owner.Env); + FillKeyTupleValue(Row, Owner.KeyIndices, Owner.RowTypes, keyTuple, *engineCtx.Env); if (engineCtx.Host->IsPathErased(Owner.TableId)) { return; @@ -96,13 +96,12 @@ public: public: TKqpDeleteRowsWrapper(TComputationMutables& mutables, const TTableId& tableId, IComputationNode* rowsNode, - TVector<NScheme::TTypeInfo> rowTypes, TVector<ui32> keyIndices, const TTypeEnvironment& env) + TVector<NScheme::TTypeInfo> rowTypes, TVector<ui32> keyIndices) : TBase(mutables) , TableId(tableId) , RowsNode(rowsNode) , RowTypes(std::move(rowTypes)) , KeyIndices(std::move(keyIndices)) - , Env(env) {} private: @@ -115,7 +114,6 @@ private: IComputationNode* RowsNode; const TVector<NScheme::TTypeInfo> RowTypes; const TVector<ui32> KeyIndices; - const TTypeEnvironment& Env; }; } // namespace @@ -167,7 +165,7 @@ IComputationNode* WrapKqpDeleteRows(TCallable& callable, const TComputationNodeF } return new TKqpDeleteRowsWrapper(ctx.Mutables, tableId, - LocateNode(ctx.NodeLocator, *rowsNode.GetNode()), std::move(rowTypes), std::move(keyIndices), ctx.Env); + LocateNode(ctx.NodeLocator, *rowsNode.GetNode()), std::move(rowTypes), std::move(keyIndices)); } } // namespace NMiniKQL diff --git a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp index eb50e43406..d72063a50e 100644 --- a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp @@ -89,10 +89,9 @@ class TKqpLookupRowsWrapper : public TStatelessFlowComputationNode<TKqpLookupRow public: TKqpLookupRowsWrapper(TComputationMutables& mutables, TKqpDatashardComputeContext& computeCtx, - const TTypeEnvironment& typeEnv, const TParseLookupTableResult& parseResult, IComputationNode* lookupKeysNode) + const TParseLookupTableResult& parseResult, IComputationNode* lookupKeysNode) : TBase(mutables, this, EValueRepresentation::Boxed) , ComputeCtx(computeCtx) - , TypeEnv(typeEnv) , ParseResult(parseResult) , LookupKeysNode(lookupKeysNode) , ColumnTags(ParseResult.Columns) @@ -116,7 +115,7 @@ public: switch (keysValues.Fetch(key)) { case NUdf::EFetchStatus::Ok: { TVector<TCell> keyCells(ParseResult.KeyIndices.size()); - FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, keyCells, TypeEnv); + FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, keyCells, ctx.TypeEnv); NUdf::TUnboxedValue result; TKqpTableStats stats; @@ -160,7 +159,6 @@ private: private: TKqpDatashardComputeContext& ComputeCtx; - const TTypeEnvironment& TypeEnv; TParseLookupTableResult ParseResult; IComputationNode* LookupKeysNode; TSmallVec<TTag> ColumnTags; @@ -174,10 +172,9 @@ class TKqpLookupTableWrapper : public TStatelessFlowComputationNode<TKqpLookupTa public: TKqpLookupTableWrapper(TComputationMutables& mutables, TKqpDatashardComputeContext& computeCtx, - const TTypeEnvironment& typeEnv, const TParseLookupTableResult& parseResult, IComputationNode* lookupKeysNode) + const TParseLookupTableResult& parseResult, IComputationNode* lookupKeysNode) : TBase(mutables, this, EValueRepresentation::Boxed) , ComputeCtx(computeCtx) - , TypeEnv(typeEnv) , ParseResult(parseResult) , LookupKeysNode(lookupKeysNode) , ColumnTags(ParseResult.Columns) @@ -200,10 +197,10 @@ public: MKQL_ENSURE_S(tableInfo); TVector<TCell> fromCells(tableInfo->KeyColumns.size()); - FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, fromCells, TypeEnv); + FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, fromCells, ctx.TypeEnv); TVector<TCell> toCells(ParseResult.KeyIndices.size()); - FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, toCells, TypeEnv); + FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, toCells, ctx.TypeEnv); auto range = TTableRange(fromCells, true, toCells, true); @@ -258,7 +255,6 @@ private: private: TKqpDatashardComputeContext& ComputeCtx; - const TTypeEnvironment& TypeEnv; TParseLookupTableResult ParseResult; IComputationNode* LookupKeysNode; TSmallVec<TTag> ColumnTags; @@ -282,9 +278,9 @@ IComputationNode* WrapKqpLookupTableInternal(TCallable& callable, const TComputa MKQL_ENSURE_S(tableInfo); if (tableInfo->KeyColumns.size() == parseResult.KeyIndices.size()) { - return new TKqpLookupRowsWrapper(ctx.Mutables, computeCtx, ctx.Env, parseResult, lookupKeysNode); + return new TKqpLookupRowsWrapper(ctx.Mutables, computeCtx, parseResult, lookupKeysNode); } else { - return new TKqpLookupTableWrapper(ctx.Mutables, computeCtx, ctx.Env, parseResult, lookupKeysNode); + return new TKqpLookupTableWrapper(ctx.Mutables, computeCtx, parseResult, lookupKeysNode); } } diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp index bf87ae31b3..08b10e0791 100644 --- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp @@ -186,11 +186,10 @@ template <bool IsReverse> class TKqpWideReadTableWrapperBase : public TStatelessWideFlowCodegeneratorNode<TKqpWideReadTableWrapperBase<IsReverse>> { public: TKqpWideReadTableWrapperBase(const TTableId& tableId, TKqpDatashardComputeContext& computeCtx, - const TTypeEnvironment& typeEnv, const TSmallVec<TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys) + const TSmallVec<TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys) : TStatelessWideFlowCodegeneratorNode<TKqpWideReadTableWrapperBase<IsReverse>>(this) , TableId(tableId) , ComputeCtx(computeCtx) - , TypeEnv(typeEnv) , SystemColumnTags(systemColumnTags) , SkipNullKeys(skipNullKeys) , ShardTableStats(ComputeCtx.GetDatashardCounters()) @@ -240,7 +239,6 @@ protected: protected: const TTableId TableId; TKqpDatashardComputeContext& ComputeCtx; - const TTypeEnvironment& TypeEnv; TSmallVec<TTag> SystemColumnTags; TSmallVec<bool> SkipNullKeys; TKqpTableStats& ShardTableStats; @@ -253,11 +251,10 @@ protected: template <bool IsReverse> class TKqpWideReadTableWrapper : public TKqpWideReadTableWrapperBase<IsReverse> { public: - TKqpWideReadTableWrapper(TKqpDatashardComputeContext& computeCtx, const TTypeEnvironment& typeEnv, - const TParseReadTableResult& parseResult, IComputationNode* fromNode, IComputationNode* toNode, - IComputationNode* itemsLimit) - : TKqpWideReadTableWrapperBase<IsReverse>(parseResult.TableId, computeCtx, typeEnv, - parseResult.SystemColumns, parseResult.SkipNullKeys) + TKqpWideReadTableWrapper(TKqpDatashardComputeContext& computeCtx, const TParseReadTableResult& parseResult, + IComputationNode* fromNode, IComputationNode* toNode, IComputationNode* itemsLimit) + : TKqpWideReadTableWrapperBase<IsReverse>(parseResult.TableId, computeCtx, parseResult.SystemColumns, + parseResult.SkipNullKeys) , ParseResult(parseResult) , FromNode(fromNode) , ToNode(toNode) @@ -272,10 +269,10 @@ private: EFetchResult ReadValue(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const final { if (!this->Iterator) { TVector<TCell> fromCells; - BuildKeyTupleCells(ParseResult.FromTuple->GetType(), FromNode->GetValue(ctx), fromCells, this->TypeEnv); + BuildKeyTupleCells(ParseResult.FromTuple->GetType(), FromNode->GetValue(ctx), fromCells, ctx.TypeEnv); TVector<TCell> toCells; - BuildKeyTupleCells(ParseResult.ToTuple->GetType(), ToNode->GetValue(ctx), toCells, this->TypeEnv); + BuildKeyTupleCells(ParseResult.ToTuple->GetType(), ToNode->GetValue(ctx), toCells, ctx.TypeEnv); auto range = TTableRange(fromCells, ParseResult.FromInclusive, toCells, ParseResult.ToInclusive); @@ -317,10 +314,10 @@ private: template <bool IsReverse> class TKqpWideReadTableRangesWrapper : public TKqpWideReadTableWrapperBase<IsReverse> { public: - TKqpWideReadTableRangesWrapper(TKqpDatashardComputeContext& computeCtx, const TTypeEnvironment& typeEnv, + TKqpWideReadTableRangesWrapper(TKqpDatashardComputeContext& computeCtx, const TParseReadTableRangesResult& parseResult, IComputationNode* rangesNode, IComputationNode* itemsLimit) - : TKqpWideReadTableWrapperBase<IsReverse>(parseResult.TableId, computeCtx, typeEnv, - parseResult.SystemColumns, parseResult.SkipNullKeys) + : TKqpWideReadTableWrapperBase<IsReverse>(parseResult.TableId, computeCtx, parseResult.SystemColumns, + parseResult.SkipNullKeys) , ParseResult(parseResult) , RangesNode(rangesNode) , ItemsLimit(itemsLimit) @@ -331,7 +328,7 @@ private: if (!RangeId) { const auto localTid = this->ComputeCtx.GetLocalTableId(ParseResult.TableId); const auto* tableInfo = this->ComputeCtx.Database->GetScheme().GetTableInfo(localTid); - Ranges = CreateTableRanges<IsReverse>(ParseResult, RangesNode, this->TypeEnv, ctx, tableInfo->KeyColumns.size()); + Ranges = CreateTableRanges<IsReverse>(ParseResult, RangesNode, ctx.TypeEnv, ctx, tableInfo->KeyColumns.size()); RangeId = 0; if (ItemsLimit) { @@ -408,10 +405,10 @@ IComputationNode* WrapKqpWideReadTableRanges(TCallable& callable, const TComputa } if (parseResult.Reverse) { - return new TKqpWideReadTableRangesWrapper<true>(computeCtx, ctx.Env, parseResult, rangesNode, itemsLimit); + return new TKqpWideReadTableRangesWrapper<true>(computeCtx, parseResult, rangesNode, itemsLimit); } - return new TKqpWideReadTableRangesWrapper<false>(computeCtx, ctx.Env, parseResult, rangesNode, itemsLimit); + return new TKqpWideReadTableRangesWrapper<false>(computeCtx, parseResult, rangesNode, itemsLimit); } IComputationNode* WrapKqpWideReadTable(TCallable& callable, const TComputationNodeFactoryContext& ctx, @@ -433,10 +430,10 @@ IComputationNode* WrapKqpWideReadTable(TCallable& callable, const TComputationNo } if (parseResult.Reverse) { - return new TKqpWideReadTableWrapper<true>(computeCtx, ctx.Env, parseResult, fromNode, toNode, itemsLimit); + return new TKqpWideReadTableWrapper<true>(computeCtx, parseResult, fromNode, toNode, itemsLimit); } - return new TKqpWideReadTableWrapper<false>(computeCtx, ctx.Env, parseResult, fromNode, toNode, itemsLimit); + return new TKqpWideReadTableWrapper<false>(computeCtx, parseResult, fromNode, toNode, itemsLimit); } } // namespace NMiniKQL diff --git a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp index d6a95c1eb4..4719669ead 100644 --- a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp @@ -44,7 +44,7 @@ public: auto& dsApplyCtx = *CheckedCast<TKqpDatashardApplyContext*>(&applyContext); TVector<TCell> keyTuple(Owner.KeyIndices.size()); - FillKeyTupleValue(Row, Owner.KeyIndices, Owner.RowTypes, keyTuple, Owner.Env); + FillKeyTupleValue(Row, Owner.KeyIndices, Owner.RowTypes, keyTuple, *dsApplyCtx.Env); if (dsApplyCtx.Host->IsPathErased(Owner.TableId)) { return; @@ -76,7 +76,7 @@ public: // NOTE: We have to copy values here as some values inlined in TUnboxedValue // cannot be inlined in TCell. - command.Value = MakeCell(type, value, Owner.Env, true); + command.Value = MakeCell(type, value, *dsApplyCtx.Env, true); commands.emplace_back(std::move(command)); } @@ -132,14 +132,13 @@ public: public: TKqpUpsertRowsWrapper(TComputationMutables& mutables, const TTableId& tableId, IComputationNode* rowsNode, TVector<NScheme::TTypeInfo>&& rowTypes, TVector<ui32>&& keyIndices, - TVector<TUpsertColumn>&& upsertColumns, const TTypeEnvironment& env) + TVector<TUpsertColumn>&& upsertColumns) : TBase(mutables) , TableId(tableId) , RowsNode(rowsNode) , RowTypes(std::move(rowTypes)) , KeyIndices(std::move(keyIndices)) , UpsertColumns(std::move(upsertColumns)) - , Env(env) {} private: @@ -153,7 +152,6 @@ private: TVector<NScheme::TTypeInfo> RowTypes; TVector<ui32> KeyIndices; TVector<TUpsertColumn> UpsertColumns; - const TTypeEnvironment& Env; }; } // namespace @@ -227,7 +225,7 @@ IComputationNode* WrapKqpUpsertRows(TCallable& callable, const TComputationNodeF return new TKqpUpsertRowsWrapper(ctx.Mutables, tableId, LocateNode(ctx.NodeLocator, *rowsNode.GetNode()), std::move(rowTypes), std::move(keyIndices), - std::move(upsertColumns), ctx.Env); + std::move(upsertColumns)); } } // namespace NMiniKQL diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index b444f11ef6..a97aa62869 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -419,7 +419,8 @@ public: // clone pattern using TDqTaskRunner's alloc auto opts = CreatePatternOpts(Alloc(), TypeEnv()); - ProgramParsed.CompGraph = ProgramParsed.GetPattern()->Clone(opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider)); + ProgramParsed.CompGraph = ProgramParsed.GetPattern()->Clone( + opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &TypeEnv())); TBindTerminator term(ProgramParsed.CompGraph->GetTerminator()); diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp index 7c199e000d..f7249bea95 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp @@ -36,6 +36,7 @@ TComputationContext::TComputationContext(const THolderFactory& holderFactory, , TimeProvider(opts.TimeProvider) , ArrowMemoryPool(arrowMemoryPool) , WideFields(mutables.CurWideFieldsIndex, nullptr) + , TypeEnv(*opts.TypeEnv) { std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid())); diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h index 6ea0d9f15a..f113752a12 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h @@ -54,7 +54,20 @@ struct TComputationOptsFull: public TComputationOpts { , ValidatePolicy(validatePolicy) , SecureParamsProvider(secureParamsProvider) {} + + TComputationOptsFull(IStatsRegistry* stats, TTypeEnvironment* typeEnv, IRandomProvider& randomProvider, + ITimeProvider& timeProvider, NUdf::EValidatePolicy validatePolicy, const NUdf::ISecureParamsProvider* secureParamsProvider) + : TComputationOpts(stats) + , AllocState(typeEnv->GetAllocator().Ref()) + , TypeEnv(typeEnv) + , RandomProvider(randomProvider) + , TimeProvider(timeProvider) + , ValidatePolicy(validatePolicy) + , SecureParamsProvider(secureParamsProvider) + {} + TAllocState& AllocState; + TTypeEnvironment* TypeEnv = nullptr; IRandomProvider& RandomProvider; ITimeProvider& TimeProvider; NUdf::EValidatePolicy ValidatePolicy; @@ -106,12 +119,14 @@ struct TComputationContext : public TComputationContextLLVM { bool ExecuteLLVM = true; arrow::MemoryPool& ArrowMemoryPool; std::vector<NUdf::TUnboxedValue*> WideFields; + TTypeEnvironment& TypeEnv; TComputationContext(const THolderFactory& holderFactory, const NUdf::IValueBuilder* builder, TComputationOptsFull& opts, const TComputationMutables& mutables, arrow::MemoryPool& arrowMemoryPool); + ~TComputationContext(); // Returns true if current usage delta exceeds the memory limit @@ -345,6 +360,10 @@ struct TComputationPatternOpts { TComputationOptsFull ToComputationOptions(IRandomProvider& randomProvider, ITimeProvider& timeProvider, TAllocState* allocStatePtr = nullptr) const { return TComputationOptsFull(Stats, allocStatePtr ? *allocStatePtr : AllocState, randomProvider, timeProvider, ValidatePolicy, SecureParamsProvider); } + + TComputationOptsFull ToComputationOptions(IRandomProvider& randomProvider, ITimeProvider& timeProvider, TTypeEnvironment* typeEnv) const { + return TComputationOptsFull(Stats, typeEnv, randomProvider, timeProvider, ValidatePolicy, SecureParamsProvider); + } }; class IComputationPattern: public TAtomicRefCount<IComputationPattern> { |