diff options
author | Igor Makunin <igor.makunin@gmail.com> | 2022-02-15 14:14:14 +0300 |
---|---|---|
committer | Igor Makunin <igor.makunin@gmail.com> | 2022-02-15 14:14:14 +0300 |
commit | 86e0da74a56bc3d661e660f072ec7f627f219cd0 (patch) | |
tree | 9dbb89e89f58c754993a6283f89c9e100cda90a0 | |
parent | 0d162201623457fcf30d4c290fc70b1f2833cf34 (diff) | |
download | ydb-86e0da74a56bc3d661e660f072ec7f627f219cd0.tar.gz |
KIKIMR-14323: support SplitByLoad at NewEngine
ref:d99741d3ae72999deeebfe8e16b2b118d7f6d168
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_read_table.cpp | 31 |
4 files changed, 32 insertions, 17 deletions
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index 6e227a5c98e..885c6337806 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -274,5 +274,9 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated return ret; } +void TKqpDatashardComputeContext::AddKeyAccessSample(const TTableId& tableId, const TArrayRef<const TCell>& key) { + Shard->GetKeyAccessSampler()->AddSample(tableId, key); +} + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h index 4bbb8fa0de7..18627630814 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.h +++ b/ydb/core/tx/datashard/datashard_kqp_compute.h @@ -58,6 +58,8 @@ public: void SetTabletNotReady() { Y_VERIFY_DEBUG(!TabletNotReady); TabletNotReady = true; }; bool IsTabletNotReady() const { return TabletNotReady; } + void AddKeyAccessSample(const TTableId& tableId, const TArrayRef<const TCell>& key); + public: NTable::TDatabase* Database = nullptr; @@ -80,12 +82,12 @@ public: TSmallVec<NTable::TTag> ExtractTags(const TSmallVec<TKqpComputeContextBase::TColumn>& columns); -bool TryFetchRow(NTable::TTableIt& iterator, NYql::NUdf::TUnboxedValue& row, TComputationContext& ctx, - TKqpTableStats& tableStats, const TKqpDatashardComputeContext& computeCtx, +bool TryFetchRow(const TTableId& tableId, NTable::TTableIt& iterator, NYql::NUdf::TUnboxedValue& row, + TComputationContext& ctx, TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys); -bool TryFetchRow(NTable::TTableReverseIt& iterator, NYql::NUdf::TUnboxedValue& row, TComputationContext& ctx, - TKqpTableStats& tableStats, const TKqpDatashardComputeContext& computeCtx, +bool TryFetchRow(const TTableId& tableId, NTable::TTableReverseIt& iterator, NYql::NUdf::TUnboxedValue& row, + TComputationContext& ctx, TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys); void FetchRow(const TDbTupleRef& dbTuple, NYql::NUdf::TUnboxedValue& row, TComputationContext& ctx, diff --git a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp index 24582c2664d..7d6ed38420e 100644 --- a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp @@ -125,6 +125,7 @@ public: } ComputeCtx.ReadTable(ParseResult.TableId, keyCells); + ComputeCtx.AddKeyAccessSample(ParseResult.TableId, keyCells); NTable::TRowState dbRow; NTable::TSelectStats stats; @@ -262,7 +263,8 @@ public: TUnboxedValue result; TKqpTableStats tableStats; - auto fetched = TryFetchRow(*Iterator, result, ctx, tableStats, ComputeCtx, SystemColumnTags, ParseResult.SkipNullKeys); + auto fetched = TryFetchRow(ParseResult.TableId, *Iterator, result, ctx, tableStats, ComputeCtx, + SystemColumnTags, ParseResult.SkipNullKeys); if (Iterator->Stats.InvisibleRowSkips) { ComputeCtx.BreakSetLocks(); diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp index 5952dc15c00..ded2e109aa6 100644 --- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp @@ -195,9 +195,10 @@ void CreateRangePoints(ui64 localTid, const TSerializedTableRange& serializedTab template <bool IsReverse> class TKqpWideReadTableWrapperBase : public TStatelessWideFlowCodegeneratorNode<TKqpWideReadTableWrapperBase<IsReverse>> { public: - TKqpWideReadTableWrapperBase(TKqpDatashardComputeContext& computeCtx, const TTypeEnvironment& typeEnv, - const TSmallVec<TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys) + TKqpWideReadTableWrapperBase(const TTableId& tableId, TKqpDatashardComputeContext& computeCtx, + const TTypeEnvironment& typeEnv, const TSmallVec<TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys) : TStatelessWideFlowCodegeneratorNode<TKqpWideReadTableWrapperBase<IsReverse>>(this) + , TableId(tableId) , ComputeCtx(computeCtx) , TypeEnv(typeEnv) , SystemColumnTags(systemColumnTags) @@ -228,6 +229,8 @@ protected: } TDbTupleRef rowKey = Iterator->GetKey(); + ComputeCtx.AddKeyAccessSample(TableId, rowKey.Cells()); + ui64 deletedRowSkips = std::exchange(Iterator->Stats.DeletedRowSkips, 0); ui64 invisibleRowSkips = std::exchange(Iterator->Stats.InvisibleRowSkips, 0); @@ -314,6 +317,7 @@ protected: } protected: + const TTableId TableId; TKqpDatashardComputeContext& ComputeCtx; const TTypeEnvironment& TypeEnv; TSmallVec<TTag> SystemColumnTags; @@ -331,7 +335,8 @@ public: TKqpWideReadTableWrapper(TKqpDatashardComputeContext& computeCtx, const TTypeEnvironment& typeEnv, const TParseReadTableResult& parseResult, IComputationNode* fromNode, IComputationNode* toNode, IComputationNode* itemsLimit) - : TKqpWideReadTableWrapperBase<IsReverse>(computeCtx, typeEnv, ExtractTags(parseResult.SystemColumns), parseResult.SkipNullKeys) + : TKqpWideReadTableWrapperBase<IsReverse>(parseResult.TableId, computeCtx, typeEnv, + ExtractTags(parseResult.SystemColumns), parseResult.SkipNullKeys) , ParseResult(parseResult) , FromNode(fromNode) , ToNode(toNode) @@ -401,7 +406,8 @@ class TKqpWideReadTableRangesWrapper : public TKqpWideReadTableWrapperBase<IsRev public: TKqpWideReadTableRangesWrapper(TKqpDatashardComputeContext& computeCtx, const TTypeEnvironment& typeEnv, const TParseReadTableRangesResult& parseResult, IComputationNode* rangesNode, IComputationNode* itemsLimit) - : TKqpWideReadTableWrapperBase<IsReverse>(computeCtx, typeEnv, ExtractTags(parseResult.SystemColumns), parseResult.SkipNullKeys) + : TKqpWideReadTableWrapperBase<IsReverse>(parseResult.TableId, computeCtx, typeEnv, + ExtractTags(parseResult.SystemColumns), parseResult.SkipNullKeys) , ParseResult(parseResult) , RangesNode(rangesNode) , ItemsLimit(itemsLimit) @@ -513,12 +519,13 @@ void FetchRowImpl(const TDbTupleRef& dbTuple, TUnboxedValue& row, TComputationCo } template <typename TTableIterator> -bool TryFetchRowImpl(TTableIterator& iterator, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats, - const TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags, +bool TryFetchRowImpl(const TTableId& tableId, TTableIterator& iterator, TUnboxedValue& row, TComputationContext& ctx, + TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys) { while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) { TDbTupleRef rowKey = iterator.GetKey(); + computeCtx.AddKeyAccessSample(tableId, rowKey.Cells()); Y_VERIFY(skipNullKeys.size() <= rowKey.ColumnCount); bool skipRow = false; @@ -543,18 +550,18 @@ bool TryFetchRowImpl(TTableIterator& iterator, TUnboxedValue& row, TComputationC } // namespace -bool TryFetchRow(TTableIt& iterator, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats, - const TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags, +bool TryFetchRow(const TTableId& tableId, TTableIt& iterator, TUnboxedValue& row, TComputationContext& ctx, + TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys) { - return TryFetchRowImpl(iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys); + return TryFetchRowImpl(tableId, iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys); } -bool TryFetchRow(TTableReverseIt& iterator, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats, - const TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags, +bool TryFetchRow(const TTableId& tableId, TTableReverseIt& iterator, TUnboxedValue& row, TComputationContext& ctx, + TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys) { - return TryFetchRowImpl(iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys); + return TryFetchRowImpl(tableId, iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys); } void FetchRow(const TDbTupleRef& dbTuple, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats, |