aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Makunin <igor.makunin@gmail.com>2022-02-15 14:14:14 +0300
committerIgor Makunin <igor.makunin@gmail.com>2022-02-15 14:14:14 +0300
commit86e0da74a56bc3d661e660f072ec7f627f219cd0 (patch)
tree9dbb89e89f58c754993a6283f89c9e100cda90a0
parent0d162201623457fcf30d4c290fc70b1f2833cf34 (diff)
downloadydb-86e0da74a56bc3d661e660f072ec7f627f219cd0.tar.gz
KIKIMR-14323: support SplitByLoad at NewEngine
ref:d99741d3ae72999deeebfe8e16b2b118d7f6d168
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.h10
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_read_table.cpp31
4 files changed, 32 insertions, 17 deletions
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index 6e227a5c98e..885c6337806 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -274,5 +274,9 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated
return ret;
}
+void TKqpDatashardComputeContext::AddKeyAccessSample(const TTableId& tableId, const TArrayRef<const TCell>& key) {
+ Shard->GetKeyAccessSampler()->AddSample(tableId, key);
+}
+
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h
index 4bbb8fa0de7..18627630814 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.h
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.h
@@ -58,6 +58,8 @@ public:
void SetTabletNotReady() { Y_VERIFY_DEBUG(!TabletNotReady); TabletNotReady = true; };
bool IsTabletNotReady() const { return TabletNotReady; }
+ void AddKeyAccessSample(const TTableId& tableId, const TArrayRef<const TCell>& key);
+
public:
NTable::TDatabase* Database = nullptr;
@@ -80,12 +82,12 @@ public:
TSmallVec<NTable::TTag> ExtractTags(const TSmallVec<TKqpComputeContextBase::TColumn>& columns);
-bool TryFetchRow(NTable::TTableIt& iterator, NYql::NUdf::TUnboxedValue& row, TComputationContext& ctx,
- TKqpTableStats& tableStats, const TKqpDatashardComputeContext& computeCtx,
+bool TryFetchRow(const TTableId& tableId, NTable::TTableIt& iterator, NYql::NUdf::TUnboxedValue& row,
+ TComputationContext& ctx, TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx,
const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys);
-bool TryFetchRow(NTable::TTableReverseIt& iterator, NYql::NUdf::TUnboxedValue& row, TComputationContext& ctx,
- TKqpTableStats& tableStats, const TKqpDatashardComputeContext& computeCtx,
+bool TryFetchRow(const TTableId& tableId, NTable::TTableReverseIt& iterator, NYql::NUdf::TUnboxedValue& row,
+ TComputationContext& ctx, TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx,
const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys);
void FetchRow(const TDbTupleRef& dbTuple, NYql::NUdf::TUnboxedValue& row, TComputationContext& ctx,
diff --git a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
index 24582c2664d..7d6ed38420e 100644
--- a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
@@ -125,6 +125,7 @@ public:
}
ComputeCtx.ReadTable(ParseResult.TableId, keyCells);
+ ComputeCtx.AddKeyAccessSample(ParseResult.TableId, keyCells);
NTable::TRowState dbRow;
NTable::TSelectStats stats;
@@ -262,7 +263,8 @@ public:
TUnboxedValue result;
TKqpTableStats tableStats;
- auto fetched = TryFetchRow(*Iterator, result, ctx, tableStats, ComputeCtx, SystemColumnTags, ParseResult.SkipNullKeys);
+ auto fetched = TryFetchRow(ParseResult.TableId, *Iterator, result, ctx, tableStats, ComputeCtx,
+ SystemColumnTags, ParseResult.SkipNullKeys);
if (Iterator->Stats.InvisibleRowSkips) {
ComputeCtx.BreakSetLocks();
diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
index 5952dc15c00..ded2e109aa6 100644
--- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
@@ -195,9 +195,10 @@ void CreateRangePoints(ui64 localTid, const TSerializedTableRange& serializedTab
template <bool IsReverse>
class TKqpWideReadTableWrapperBase : public TStatelessWideFlowCodegeneratorNode<TKqpWideReadTableWrapperBase<IsReverse>> {
public:
- TKqpWideReadTableWrapperBase(TKqpDatashardComputeContext& computeCtx, const TTypeEnvironment& typeEnv,
- const TSmallVec<TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys)
+ TKqpWideReadTableWrapperBase(const TTableId& tableId, TKqpDatashardComputeContext& computeCtx,
+ const TTypeEnvironment& typeEnv, const TSmallVec<TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys)
: TStatelessWideFlowCodegeneratorNode<TKqpWideReadTableWrapperBase<IsReverse>>(this)
+ , TableId(tableId)
, ComputeCtx(computeCtx)
, TypeEnv(typeEnv)
, SystemColumnTags(systemColumnTags)
@@ -228,6 +229,8 @@ protected:
}
TDbTupleRef rowKey = Iterator->GetKey();
+ ComputeCtx.AddKeyAccessSample(TableId, rowKey.Cells());
+
ui64 deletedRowSkips = std::exchange(Iterator->Stats.DeletedRowSkips, 0);
ui64 invisibleRowSkips = std::exchange(Iterator->Stats.InvisibleRowSkips, 0);
@@ -314,6 +317,7 @@ protected:
}
protected:
+ const TTableId TableId;
TKqpDatashardComputeContext& ComputeCtx;
const TTypeEnvironment& TypeEnv;
TSmallVec<TTag> SystemColumnTags;
@@ -331,7 +335,8 @@ public:
TKqpWideReadTableWrapper(TKqpDatashardComputeContext& computeCtx, const TTypeEnvironment& typeEnv,
const TParseReadTableResult& parseResult, IComputationNode* fromNode, IComputationNode* toNode,
IComputationNode* itemsLimit)
- : TKqpWideReadTableWrapperBase<IsReverse>(computeCtx, typeEnv, ExtractTags(parseResult.SystemColumns), parseResult.SkipNullKeys)
+ : TKqpWideReadTableWrapperBase<IsReverse>(parseResult.TableId, computeCtx, typeEnv,
+ ExtractTags(parseResult.SystemColumns), parseResult.SkipNullKeys)
, ParseResult(parseResult)
, FromNode(fromNode)
, ToNode(toNode)
@@ -401,7 +406,8 @@ class TKqpWideReadTableRangesWrapper : public TKqpWideReadTableWrapperBase<IsRev
public:
TKqpWideReadTableRangesWrapper(TKqpDatashardComputeContext& computeCtx, const TTypeEnvironment& typeEnv,
const TParseReadTableRangesResult& parseResult, IComputationNode* rangesNode, IComputationNode* itemsLimit)
- : TKqpWideReadTableWrapperBase<IsReverse>(computeCtx, typeEnv, ExtractTags(parseResult.SystemColumns), parseResult.SkipNullKeys)
+ : TKqpWideReadTableWrapperBase<IsReverse>(parseResult.TableId, computeCtx, typeEnv,
+ ExtractTags(parseResult.SystemColumns), parseResult.SkipNullKeys)
, ParseResult(parseResult)
, RangesNode(rangesNode)
, ItemsLimit(itemsLimit)
@@ -513,12 +519,13 @@ void FetchRowImpl(const TDbTupleRef& dbTuple, TUnboxedValue& row, TComputationCo
}
template <typename TTableIterator>
-bool TryFetchRowImpl(TTableIterator& iterator, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats,
- const TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags,
+bool TryFetchRowImpl(const TTableId& tableId, TTableIterator& iterator, TUnboxedValue& row, TComputationContext& ctx,
+ TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags,
const TSmallVec<bool>& skipNullKeys)
{
while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) {
TDbTupleRef rowKey = iterator.GetKey();
+ computeCtx.AddKeyAccessSample(tableId, rowKey.Cells());
Y_VERIFY(skipNullKeys.size() <= rowKey.ColumnCount);
bool skipRow = false;
@@ -543,18 +550,18 @@ bool TryFetchRowImpl(TTableIterator& iterator, TUnboxedValue& row, TComputationC
} // namespace
-bool TryFetchRow(TTableIt& iterator, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats,
- const TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags,
+bool TryFetchRow(const TTableId& tableId, TTableIt& iterator, TUnboxedValue& row, TComputationContext& ctx,
+ TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags,
const TSmallVec<bool>& skipNullKeys)
{
- return TryFetchRowImpl(iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys);
+ return TryFetchRowImpl(tableId, iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys);
}
-bool TryFetchRow(TTableReverseIt& iterator, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats,
- const TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags,
+bool TryFetchRow(const TTableId& tableId, TTableReverseIt& iterator, TUnboxedValue& row, TComputationContext& ctx,
+ TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags,
const TSmallVec<bool>& skipNullKeys)
{
- return TryFetchRowImpl(iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys);
+ return TryFetchRowImpl(tableId, iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys);
}
void FetchRow(const TDbTupleRef& dbTuple, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats,