aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkungurtsev <kungasc@ydb.tech>2024-11-12 13:02:43 +0100
committerGitHub <noreply@github.com>2024-11-12 15:02:43 +0300
commitedeb17d54b12a0a225e43f890a31f18208560c0c (patch)
tree4e79ea762e365c41ed6c0064770fe01025a6b176
parent91cc64219e78a1b47be64f2b815027c1fcdd3f1c (diff)
downloadydb-edeb17d54b12a0a225e43f890a31f18208560c0c.tar.gz
Request DataShard compaction if scheme has been changeed (#11147)
-rw-r--r--ydb/core/protos/table_stats.proto4
-rw-r--r--ydb/core/tablet_flat/flat_comp.h2
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp100
-rw-r--r--ydb/core/tablet_flat/flat_executor.h4
-rw-r--r--ydb/core/tablet_flat/flat_stat_table.cpp12
-rw-r--r--ydb/core/tablet_flat/flat_table.cpp13
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.h1
-rw-r--r--ydb/core/tablet_flat/ut/flat_comp_ut_common.h2
-rw-r--r--ydb/core/tx/datashard/datashard.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard__compaction.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard__stats.cpp163
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h6
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp16
-rw-r--r--ydb/core/tx/datashard/datashard_ut_stats.cpp237
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp57
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.h19
-rw-r--r--ydb/core/tx/schemeshard/operation_queue_timer.h6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h2
-rw-r--r--ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp4
-rw-r--r--ydb/services/ydb/ydb_ut.cpp8
25 files changed, 527 insertions, 149 deletions
diff --git a/ydb/core/protos/table_stats.proto b/ydb/core/protos/table_stats.proto
index c80e972335..3958b1b221 100644
--- a/ydb/core/protos/table_stats.proto
+++ b/ydb/core/protos/table_stats.proto
@@ -68,4 +68,8 @@ message TTableStats {
optional TStoragePoolsStats StoragePools = 31;
optional uint64 ByKeyFilterSize = 32;
+
+ // denotes that datashard should be background compacted
+ // even if it is single parted
+ optional bool HasSchemaChanges = 33;
}
diff --git a/ydb/core/tablet_flat/flat_comp.h b/ydb/core/tablet_flat/flat_comp.h
index 5b49992129..923624d041 100644
--- a/ydb/core/tablet_flat/flat_comp.h
+++ b/ydb/core/tablet_flat/flat_comp.h
@@ -166,7 +166,7 @@ namespace NTable {
/**
* Returns row schema of the specified table
*/
- virtual TIntrusiveConstPtr<TRowScheme> RowScheme(ui32 table) = 0;
+ virtual TIntrusiveConstPtr<TRowScheme> RowScheme(ui32 table) const = 0;
/**
* Returns schema of the specified table
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index bd06779142..ffa0d56b43 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -4293,7 +4293,7 @@ const NTable::TScheme& TExecutor::DatabaseScheme()
return Scheme();
}
-TIntrusiveConstPtr<NTable::TRowScheme> TExecutor::RowScheme(ui32 table)
+TIntrusiveConstPtr<NTable::TRowScheme> TExecutor::RowScheme(ui32 table) const
{
return Database->GetRowScheme(table);
}
@@ -4334,6 +4334,80 @@ const NTable::TRowVersionRanges& TExecutor::TableRemovedRowVersions(ui32 table)
return Database->GetRemovedRowVersions(table);
}
+bool TExecutor::HasSchemaChanges(ui32 table) const {
+ auto *tableInfo = Scheme().GetTableInfo(table);
+ auto rowScheme = RowScheme(table);
+ if (!tableInfo || !rowScheme) {
+ return false;
+ }
+
+ auto subset = Database->Subset(table, NTable::TEpoch::Max(), { } , { });
+ for (const auto& partView : subset->Flatten) {
+ if (HasSchemaChanges(partView, *tableInfo, *rowScheme)) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+bool TExecutor::HasSchemaChanges(const NTable::TPartView& partView, const NTable::TScheme::TTableInfo& tableInfo, const NTable::TRowScheme& rowScheme) const {
+ if (partView.Part->Stat.Rows == 0) {
+ return false;
+ }
+
+ { // Check by key filter existence
+ bool partByKeyFilter = bool(partView->ByKey);
+ bool schemeByKeyFilter = tableInfo.ByKeyFilter;
+ if (partByKeyFilter != schemeByKeyFilter) {
+ return true;
+ }
+ }
+
+ { // Check B-Tree index existence
+ if (AppData()->FeatureFlags.GetEnableLocalDBBtreeIndex() && !partView->IndexPages.HasBTree()) {
+ return true;
+ }
+ }
+
+ { // Check families
+ size_t partFamiliesCount = partView->GroupsCount;
+ size_t schemeFamiliesCount = rowScheme.Families.size();
+ if (partFamiliesCount != schemeFamiliesCount) {
+ return true;
+ }
+
+ for (size_t index : xrange(rowScheme.Families.size())) {
+ auto familyId = rowScheme.Families[index];
+ static const NTable::TScheme::TFamily defaultFamilySettings;
+ const auto& family = tableInfo.Families.ValueRef(familyId, defaultFamilySettings); // Workaround for KIKIMR-17222
+
+ const auto* schemeGroupRoom = tableInfo.Rooms.FindPtr(family.Room);
+ Y_ABORT_UNLESS(schemeGroupRoom, "Cannot find room %" PRIu32 " in table %" PRIu32, family.Room, tableInfo.Id);
+
+ ui32 partGroupChannel = partView.Part->GetGroupChannel(NTable::NPage::TGroupId(index));
+ if (partGroupChannel != schemeGroupRoom->Main) {
+ return true;
+ }
+ }
+ }
+
+ { // Check columns
+ THashMap<NTable::TTag, ui32> partColumnGroups, schemeColumnGroups;
+ for (const auto& column : partView->Scheme->AllColumns) {
+ partColumnGroups[column.Tag] = column.Group;
+ }
+ for (const auto& col : rowScheme.Cols) {
+ schemeColumnGroups[col.Tag] = col.Group;
+ }
+ if (partColumnGroups != schemeColumnGroups) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params)
{
if (auto logl = Logger->Log(ELnLev::Info))
@@ -4379,37 +4453,29 @@ ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params)
for (size_t group : xrange(rowScheme->Families.size())) {
auto familyId = rowScheme->Families[group];
- const auto* family = tableInfo->Families.FindPtr(familyId);
- if (Y_UNLIKELY(!family)) {
- // FIXME: workaround for KIKIMR-17222
- // Column families with default settings may be missing in schema,
- // so we have to use a static variable as a substitute
- static const NTable::TScheme::TFamily defaultFamilySettings;
- family = &defaultFamilySettings;
- }
- Y_ABORT_UNLESS(family, "Cannot find family %" PRIu32 " in table %" PRIu32, familyId, table);
+ static const NTable::TScheme::TFamily defaultFamilySettings;
+ const auto& family = tableInfo->Families.ValueRef(familyId, defaultFamilySettings); // Workaround for KIKIMR-17222
- auto roomId = family->Room;
- auto* room = tableInfo->Rooms.FindPtr(roomId);
- Y_ABORT_UNLESS(room, "Cannot find room %" PRIu32 " in table %" PRIu32, roomId, table);
+ auto* room = tableInfo->Rooms.FindPtr(family.Room);
+ Y_ABORT_UNLESS(room, "Cannot find room %" PRIu32 " in table %" PRIu32, family.Room, table);
auto& pageGroup = comp->Layout.Groups.at(group);
auto& writeGroup = comp->Writer.Groups.at(group);
- pageGroup.Codec = family->Codec;
+ pageGroup.Codec = family.Codec;
pageGroup.PageSize = policy->MinDataPageSize;
pageGroup.BTreeIndexNodeTargetSize = policy->MinBTreeIndexNodeSize;
pageGroup.BTreeIndexNodeKeysMin = policy->MinBTreeIndexNodeKeys;
- writeGroup.Cache = Max(family->Cache, cache);
+ writeGroup.Cache = Max(family.Cache, cache);
writeGroup.MaxBlobSize = NBlockIO::BlockSize;
writeGroup.Channel = room->Main;
addChannel(room->Main);
if (group == 0) {
// Small/Large edges are taken from the leader family
- comp->Layout.SmallEdge = family->Small;
- comp->Layout.LargeEdge = family->Large;
+ comp->Layout.SmallEdge = family.Small;
+ comp->Layout.LargeEdge = family.Large;
// Small/Large channels are taken from the leader family
comp->Writer.BlobsChannels = room->Blobs;
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index 7c08925a47..5369017fe3 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -591,7 +591,7 @@ class TExecutor
ui64 OwnerTabletId() const override;
const NTable::TScheme& DatabaseScheme() override;
- TIntrusiveConstPtr<NTable::TRowScheme> RowScheme(ui32 table) override;
+ TIntrusiveConstPtr<NTable::TRowScheme> RowScheme(ui32 table) const override;
const NTable::TScheme::TTableInfo* TableScheme(ui32 table) override;
ui64 TableMemSize(ui32 table, NTable::TEpoch epoch) override;
NTable::TPartView TablePart(ui32 table, const TLogoBlobID& label) override;
@@ -652,6 +652,8 @@ public:
bool CancelScan(ui32 tableId, ui64 taskId) override;
TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 tableId) const override;
+ bool HasSchemaChanges(ui32 table) const override;
+ bool HasSchemaChanges(const NTable::TPartView& partView, const NTable::TScheme::TTableInfo& tableInfo, const NTable::TRowScheme& rowScheme) const;
ui64 CompactBorrowed(ui32 tableId) override;
ui64 CompactMemTable(ui32 tableId) override;
ui64 CompactTable(ui32 tableId) override;
diff --git a/ydb/core/tablet_flat/flat_stat_table.cpp b/ydb/core/tablet_flat/flat_stat_table.cpp
index afacbe7f37..5f74a4319f 100644
--- a/ydb/core/tablet_flat/flat_stat_table.cpp
+++ b/ydb/core/tablet_flat/flat_stat_table.cpp
@@ -23,14 +23,14 @@ bool BuildStats(const TSubset& subset, TStats& stats, ui64 rowCountResolution, u
}
void GetPartOwners(const TSubset& subset, THashSet<ui64>& partOwners) {
- for (auto& pi : subset.Flatten) {
- partOwners.insert(pi->Label.TabletID());
+ for (const auto& partView : subset.Flatten) {
+ partOwners.insert(partView->Label.TabletID());
}
- for (auto& pi : subset.ColdParts) {
- partOwners.insert(pi->Label.TabletID());
+ for (const auto& coldPart : subset.ColdParts) {
+ partOwners.insert(coldPart->Label.TabletID());
}
- for (auto& pi : subset.TxStatus) {
- partOwners.insert(pi->Label.TabletID());
+ for (const auto& txStatus : subset.TxStatus) {
+ partOwners.insert(txStatus->Label.TabletID());
}
}
diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp
index fd210f3bca..8475ea7529 100644
--- a/ydb/core/tablet_flat/flat_table.cpp
+++ b/ydb/core/tablet_flat/flat_table.cpp
@@ -1398,13 +1398,12 @@ bool TTable::RemoveRowVersions(const TRowVersion& lower, const TRowVersion& uppe
TCompactionStats TTable::GetCompactionStats() const
{
- TCompactionStats stats;
- stats.MemRowCount = GetMemRowCount();
- stats.MemDataSize = GetMemSize();
- stats.MemDataWaste = GetMemWaste();
- stats.PartCount = Flatten.size() + ColdParts.size();
-
- return stats;
+ return {
+ .PartCount = Flatten.size() + ColdParts.size(),
+ .MemRowCount = GetMemRowCount(),
+ .MemDataSize = GetMemSize(),
+ .MemDataWaste = GetMemWaste(),
+ };
}
void TTable::SetTableObserver(TIntrusivePtr<ITableObserver> ptr) noexcept
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h
index e56860067f..eb6ae0abcf 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.h
+++ b/ydb/core/tablet_flat/tablet_flat_executor.h
@@ -585,6 +585,7 @@ namespace NFlatExecutorSetup {
// edge and ts of last full compaction
virtual TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 tableId) const = 0;
+ virtual bool HasSchemaChanges(ui32 table) const = 0;
// Forces full compaction of the specified table in the near future
// Returns 0 if can't compact, otherwise compaction ID
diff --git a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
index 8d466bd15b..25fefd79b4 100644
--- a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
+++ b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
@@ -47,7 +47,7 @@ public:
return DB.GetScheme();
}
- TIntrusiveConstPtr<NKikimr::NTable::TRowScheme> RowScheme(ui32 table) override {
+ TIntrusiveConstPtr<NKikimr::NTable::TRowScheme> RowScheme(ui32 table) const override {
return DB.GetRowScheme(table);
}
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 6771f4517e..a8c90003f0 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -138,9 +138,9 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, MaxTxLagMilliseconds(5*60*1000, 0, 30*24*3600*1000ll)
, CanCancelROWithReadSets(0, 0, 1)
, PerShardReadSizeLimit(5368709120, 0, 107374182400)
- , CpuUsageReportThreshlodPercent(60, -1, 146)
+ , CpuUsageReportThresholdPercent(60, -1, 146)
, CpuUsageReportIntervalSeconds(60, 0, 365*86400)
- , HighDataSizeReportThreshlodBytes(10ull<<30, -1, Max<i64>())
+ , HighDataSizeReportThresholdBytes(10ull<<30, -1, Max<i64>())
, HighDataSizeReportIntervalSeconds(60, 0, 365*86400)
, DataTxProfileLogThresholdMs(0, 0, 86400000)
, DataTxProfileBufferThresholdMs(0, 0, 86400000)
@@ -308,9 +308,9 @@ void TDataShard::IcbRegister() {
appData->Icb->RegisterSharedControl(CanCancelROWithReadSets, "DataShardControls.CanCancelROWithReadSets");
appData->Icb->RegisterSharedControl(PerShardReadSizeLimit, "TxLimitControls.PerShardReadSizeLimit");
- appData->Icb->RegisterSharedControl(CpuUsageReportThreshlodPercent, "DataShardControls.CpuUsageReportThreshlodPercent");
+ appData->Icb->RegisterSharedControl(CpuUsageReportThresholdPercent, "DataShardControls.CpuUsageReportThreshlodPercent");
appData->Icb->RegisterSharedControl(CpuUsageReportIntervalSeconds, "DataShardControls.CpuUsageReportIntervalSeconds");
- appData->Icb->RegisterSharedControl(HighDataSizeReportThreshlodBytes, "DataShardControls.HighDataSizeReportThreshlodBytes");
+ appData->Icb->RegisterSharedControl(HighDataSizeReportThresholdBytes, "DataShardControls.HighDataSizeReportThreshlodBytes");
appData->Icb->RegisterSharedControl(HighDataSizeReportIntervalSeconds, "DataShardControls.HighDataSizeReportIntervalSeconds");
appData->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo");
diff --git a/ydb/core/tx/datashard/datashard__compaction.cpp b/ydb/core/tx/datashard/datashard__compaction.cpp
index eefd431321..be8842c637 100644
--- a/ydb/core/tx/datashard/datashard__compaction.cpp
+++ b/ydb/core/tx/datashard/datashard__compaction.cpp
@@ -108,7 +108,8 @@ public:
auto stats = txc.DB.GetCompactionStats(localTid);
bool isEmpty = stats.PartCount == 0 && stats.MemDataSize == 0;
bool isSingleParted = stats.PartCount == 1 && stats.MemDataSize == 0;
- if (isEmpty || isSingleParted && !hasBorrowed && !record.HasCompactSinglePartedShards()) {
+ bool hasSchemaChanges = Self->Executor()->HasSchemaChanges(tableInfo.LocalTid);
+ if (isEmpty || isSingleParted && !hasBorrowed && !hasSchemaChanges && !record.GetCompactSinglePartedShards()) {
// nothing to compact
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Background compaction of tablet# " << Self->TabletID()
diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp
index 4246a54e32..f92acafecf 100644
--- a/ydb/core/tx/datashard/datashard__stats.cpp
+++ b/ydb/core/tx/datashard/datashard__stats.cpp
@@ -15,7 +15,24 @@ namespace NDataShard {
using namespace NResourceBroker;
using namespace NTable;
-class TTableStatsCoroBuilder : public TActorCoroImpl, private IPages {
+struct TTableStatsCoroBuilderArgs {
+ TActorId ReplyTo;
+ ui64 TabletId;
+ ui64 TableId;
+ TActorId ExecutorId;
+ ui64 IndexSize;
+ TAutoPtr<TSubset> Subset;
+ ui64 MemRowCount;
+ ui64 MemDataSize;
+ ui64 RowCountResolution;
+ ui64 DataSizeResolution;
+ ui32 HistogramBucketsCount;
+ ui64 SearchHeight;
+ bool HasSchemaChanges;
+ TInstant StatsUpdateTime;
+};
+
+class TTableStatsCoroBuilder : public TActorCoroImpl, private IPages, TTableStatsCoroBuilderArgs {
private:
using ECode = TDataShard::TEvPrivate::TEvTableStatsError::ECode;
@@ -40,23 +57,9 @@ private:
};
public:
- TTableStatsCoroBuilder(TActorId replyTo, ui64 tabletId, ui64 tableId, TActorId executorId, ui64 indexSize,
- const TAutoPtr<TSubset> subset, ui64 memRowCount, ui64 memDataSize,
- ui64 rowCountResolution, ui64 dataSizeResolution, ui32 histogramBucketsCount, ui64 searchHeight, TInstant statsUpdateTime)
+ TTableStatsCoroBuilder(TTableStatsCoroBuilderArgs args)
: TActorCoroImpl(/* stackSize */ 64_KB, /* allowUnhandledDtor */ true)
- , ReplyTo(replyTo)
- , TabletId(tabletId)
- , TableId(tableId)
- , ExecutorId(executorId)
- , IndexSize(indexSize)
- , StatsUpdateTime(statsUpdateTime)
- , Subset(subset)
- , MemRowCount(memRowCount)
- , MemDataSize(memDataSize)
- , RowCountResolution(rowCountResolution)
- , DataSizeResolution(dataSizeResolution)
- , HistogramBucketsCount(histogramBucketsCount)
- , SearchHeight(searchHeight)
+ , TTableStatsCoroBuilderArgs(args)
{}
void Run() override {
@@ -138,6 +141,7 @@ private:
ev->MemRowCount = MemRowCount;
ev->MemDataSize = MemDataSize;
ev->SearchHeight = SearchHeight;
+ ev->HasSchemaChanges = HasSchemaChanges;
GetPartOwners(*Subset, ev->PartOwners);
@@ -166,6 +170,7 @@ private:
LOG_DEBUG_S(GetActorContext(), NKikimrServices::TX_DATASHARD, "BuildStats result at datashard " << TabletId << ", for tableId " << TableId
<< ": RowCount " << ev->Stats.RowCount << ", DataSize " << ev->Stats.DataSize.Size << ", IndexSize " << ev->Stats.IndexSize.Size << ", PartCount " << ev->PartCount
<< (ev->PartOwners.size() > 1 || ev->PartOwners.size() == 1 && *ev->PartOwners.begin() != TabletId ? ", with borrowed parts" : "")
+ << (ev->HasSchemaChanges ? ", with schema changes" : "")
<< ", LoadedSize " << PagesSize << ", " << NFmt::Do(*Spent) << ", HistogramKeys " << ev->Stats.DataSizeHistogram.size());
Send(ReplyTo, ev.Release());
@@ -223,19 +228,6 @@ private:
Send(MakeResourceBrokerID(), new TEvResourceBroker::TEvFinishTask(/* task id */ 1, /* cancelled */ false));
}
- TActorId ReplyTo;
- ui64 TabletId;
- ui64 TableId;
- TActorId ExecutorId;
- ui64 IndexSize;
- TInstant StatsUpdateTime;
- TAutoPtr<TSubset> Subset;
- ui64 MemRowCount;
- ui64 MemDataSize;
- ui64 RowCountResolution;
- ui64 DataSizeResolution;
- ui32 HistogramBucketsCount;
- ui64 SearchHeight;
THashMap<const TPart*, THashMap<TPageId, TSharedData>> Pages;
ui64 PagesSize = 0;
ui64 CoroutineDeadline;
@@ -308,6 +300,7 @@ public:
Result->Record.MutableTableStats()->SetPartCount(tableInfo.Stats.PartCount);
Result->Record.MutableTableStats()->SetSearchHeight(tableInfo.Stats.SearchHeight);
+ Result->Record.MutableTableStats()->SetHasSchemaChanges(tableInfo.Stats.HasSchemaChanges);
Result->Record.MutableTableStats()->SetLastFullCompactionTs(tableInfo.Stats.LastFullCompaction.Seconds());
Result->Record.MutableTableStats()->SetHasLoanedParts(Self->Executor()->HasLoanedParts());
@@ -363,38 +356,36 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorCo
Actors.erase(ev->Sender);
ui64 tableId = ev->Get()->TableId;
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "BuildStats result received at datashard " << TabletID() << ", for tableId " << tableId);
- i64 dataSize = 0;
- if (TableInfos.contains(tableId)) {
- const TUserTable& tableInfo = *TableInfos[tableId];
+ if (!TableInfos.contains(tableId)) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "BuildStats result dropped at datashard " << TabletID()
+ << ", built for tableId " << tableId << ", but table is gone (moved ot dropped)");
+ return;
+ }
- if (!tableInfo.StatsUpdateInProgress) {
- // How can this happen?
- LOG_ERROR(ctx, NKikimrServices::TX_DATASHARD,
- "Unexpected async stats update at datashard %" PRIu64, TabletID());
- }
- tableInfo.Stats.DataStats = std::move(ev->Get()->Stats);
- tableInfo.Stats.PartOwners = std::move(ev->Get()->PartOwners);
- tableInfo.Stats.PartCount = ev->Get()->PartCount;
- tableInfo.Stats.StatsUpdateTime = ev->Get()->StatsUpdateTime;
- tableInfo.Stats.MemRowCount = ev->Get()->MemRowCount;
- tableInfo.Stats.MemDataSize = ev->Get()->MemDataSize;
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "BuildStats result received at datashard " << TabletID()
+ << ", for tableId " << tableId);
- dataSize += tableInfo.Stats.DataStats.DataSize.Size;
+ const TUserTable& tableInfo = *TableInfos[tableId];
- tableInfo.Stats.SearchHeight = ev->Get()->SearchHeight;
+ if (!tableInfo.StatsUpdateInProgress) { // how can this happen?
+ LOG_ERROR(ctx, NKikimrServices::TX_DATASHARD, "Unexpected async stats update at datashard %" PRIu64, TabletID());
+ }
- tableInfo.StatsUpdateInProgress = false;
+ tableInfo.Stats.DataStats = std::move(ev->Get()->Stats);
+ tableInfo.Stats.PartOwners = std::move(ev->Get()->PartOwners);
+ tableInfo.Stats.PartCount = ev->Get()->PartCount;
+ tableInfo.Stats.StatsUpdateTime = ev->Get()->StatsUpdateTime;
+ tableInfo.Stats.MemRowCount = ev->Get()->MemRowCount;
+ tableInfo.Stats.MemDataSize = ev->Get()->MemDataSize;
+ tableInfo.Stats.SearchHeight = ev->Get()->SearchHeight;
+ tableInfo.Stats.HasSchemaChanges = ev->Get()->HasSchemaChanges;
- SendPeriodicTableStats(ctx);
+ tableInfo.StatsUpdateInProgress = false;
- } else {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Drop stats at datashard " << TabletID()
- << ", built for tableId " << tableId << ", but table is gone (moved ot dropped)");
- }
+ SendPeriodicTableStats(ctx);
- if (dataSize > HighDataSizeReportThreshlodBytes) {
+ if (static_cast<i64>(tableInfo.Stats.DataStats.DataSize.Size) > HighDataSizeReportThresholdBytes) {
TInstant now = AppData(ctx)->TimeProvider->Now();
if (LastDataSizeWarnTime + TDuration::Seconds(HighDataSizeReportIntervalSeconds) > now)
@@ -405,11 +396,11 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorCo
TStringBuilder names;
ListTableNames(GetUserTables(), names);
- LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Data size " << dataSize
- << " is higher than threshold of " << (i64)HighDataSizeReportThreshlodBytes
- << " at datashard: " << TabletID()
- << " table: " << names
- << " consider reconfiguring table partitioning settings");
+ LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Data size " << tableInfo.Stats.DataStats.DataSize.Size
+ << " is higher than threshold of " << (i64)HighDataSizeReportThresholdBytes
+ << " at datashard: " << TabletID()
+ << " table: " << names
+ << " consider reconfiguring table partitioning settings");
}
}
@@ -470,6 +461,7 @@ public:
for (auto& ti : Self->TableInfos) {
const ui32 localTableId = ti.second->LocalTid;
const ui32 shadowTableId = ti.second->ShadowTid;
+ const ui64 tableId = ti.first;
CheckIdleMemCompaction(*ti.second, txc, ctx);
@@ -488,21 +480,28 @@ public:
searchHeight = 0;
}
+ // Note: ignore shadow table for simplicity
+ bool hasSchemaChanges = Self->Executor()->HasSchemaChanges(localTableId);
+
if (!ti.second->StatsNeedUpdate) {
- ti.second->Stats.MemRowCount = memRowCount;
- ti.second->Stats.MemDataSize = memDataSize;
- ti.second->Stats.SearchHeight = searchHeight;
+ auto& stats = ti.second->Stats;
+ stats.MemRowCount = memRowCount;
+ stats.MemDataSize = memDataSize;
+ stats.SearchHeight = searchHeight;
+ stats.HasSchemaChanges = hasSchemaChanges;
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "BuildStats skipped at datashard " << Self->TabletID() << ", for tableId " << tableId
+ << ": RowCount " << stats.DataStats.RowCount << ", DataSize " << stats.DataStats.DataSize.Size << ", IndexSize " << stats.DataStats.IndexSize.Size << ", PartCount " << stats.PartCount
+ << (stats.HasSchemaChanges ? ", with schema changes" : ""));
+
continue;
}
const ui32 MaxBuckets = 500;
-
- ui64 tableId = ti.first;
ui64 rowCountResolution = gDbStatsRowCountResolution;
ui64 dataSizeResolution = gDbStatsDataSizeResolution;
ui32 histogramBucketsCount = gDbStatsHistogramBucketsCount;
-
if (ti.second->Stats.DataSizeResolution &&
ti.second->Stats.DataStats.DataSize.Size / ti.second->Stats.DataSizeResolution <= MaxBuckets)
{
@@ -547,19 +546,23 @@ public:
shadowSubset->ColdParts.end());
}
- auto builder = new TActorCoro(MakeHolder<TTableStatsCoroBuilder>(ctx.SelfID,
- Self->TabletID(),
- tableId,
- Self->ExecutorID(),
- indexSize,
- subsetForStats,
- memRowCount,
- memDataSize,
- rowCountResolution,
- dataSizeResolution,
- histogramBucketsCount,
- searchHeight,
- AppData(ctx)->TimeProvider->Now()), NKikimrServices::TActivity::DATASHARD_STATS_BUILDER);
+ auto builder = new TActorCoro(MakeHolder<TTableStatsCoroBuilder>(
+ TTableStatsCoroBuilderArgs{
+ .ReplyTo = ctx.SelfID,
+ .TabletId = Self->TabletID(),
+ .TableId = tableId,
+ .ExecutorId = Self->ExecutorID(),
+ .IndexSize = indexSize,
+ .Subset = subsetForStats,
+ .MemRowCount = memRowCount,
+ .MemDataSize = memDataSize,
+ .RowCountResolution = rowCountResolution,
+ .DataSizeResolution = dataSizeResolution,
+ .HistogramBucketsCount = histogramBucketsCount,
+ .SearchHeight = searchHeight,
+ .HasSchemaChanges = hasSchemaChanges,
+ .StatsUpdateTime = AppData(ctx)->TimeProvider->Now()
+ }), NKikimrServices::TActivity::DATASHARD_STATS_BUILDER);
TActorId actorId = ctx.Register(builder, TMailboxType::HTSwap, AppData(ctx)->BatchPoolId);
Self->Actors.insert(actorId);
@@ -614,7 +617,7 @@ void TDataShard::CollectCpuUsage(const TActorContext &ctx) {
ui64 cpuUsec = metrics->CPU.GetValue();
float cpuPercent = cpuUsec / 10000.0;
- if (cpuPercent > CpuUsageReportThreshlodPercent) {
+ if (cpuPercent > CpuUsageReportThresholdPercent) {
if (LastCpuWarnTime + TDuration::Seconds(CpuUsageReportIntervalSeconds) > now)
return;
@@ -624,7 +627,7 @@ void TDataShard::CollectCpuUsage(const TActorContext &ctx) {
ListTableNames(GetUserTables(), names);
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "CPU usage " << cpuPercent
- << "% is higher than threshold of " << (i64)CpuUsageReportThreshlodPercent
+ << "% is higher than threshold of " << (i64)CpuUsageReportThresholdPercent
<< "% in-flight Tx: " << TxInFly()
<< " immediate Tx: " << ImmediateInFly()
<< " readIterators: " << ReadIteratorsInFly()
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index a5c275c22f..aab59d0742 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -387,6 +387,7 @@ class TDataShard
ui64 MemRowCount = 0;
ui64 MemDataSize = 0;
ui64 SearchHeight = 0;
+ bool HasSchemaChanges = false;
};
struct TEvTableStatsError : public TEventLocal<TEvTableStatsError, EvTableStatsError> {
@@ -2726,9 +2727,9 @@ private:
TControlWrapper MaxTxLagMilliseconds;
TControlWrapper CanCancelROWithReadSets;
TControlWrapper PerShardReadSizeLimit;
- TControlWrapper CpuUsageReportThreshlodPercent;
+ TControlWrapper CpuUsageReportThresholdPercent;
TControlWrapper CpuUsageReportIntervalSeconds;
- TControlWrapper HighDataSizeReportThreshlodBytes;
+ TControlWrapper HighDataSizeReportThresholdBytes;
TControlWrapper HighDataSizeReportIntervalSeconds;
TControlWrapper DataTxProfileLogThresholdMs;
@@ -3299,6 +3300,7 @@ protected:
ev->Record.MutableTableStats()->SetPartCount(ti.Stats.PartCount);
ev->Record.MutableTableStats()->SetSearchHeight(ti.Stats.SearchHeight);
+ ev->Record.MutableTableStats()->SetHasSchemaChanges(ti.Stats.HasSchemaChanges);
ev->Record.MutableTableStats()->SetLastFullCompactionTs(ti.Stats.LastFullCompaction.Seconds());
ev->Record.MutableTableStats()->SetHasLoanedParts(Executor()->HasLoanedParts());
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index 2feea92c97..9845ef7f8d 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -394,6 +394,7 @@ struct TUserTable : public TThrRefBase {
THashSet<ui64> PartOwners;
ui64 PartCount = 0;
ui64 SearchHeight = 0;
+ bool HasSchemaChanges = false;
TInstant StatsUpdateTime;
ui64 DataSizeResolution = 0;
ui64 RowCountResolution = 0;
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 7384a1f43a..5ac95e737f 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -3591,7 +3591,9 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
if (Commit)
expectedRowCount += writeCount * rowCount;
- ui64 statRowCount = WaitTableStats(*runtime, tabletId, 0, expectedRowCount).GetTableStats().GetRowCount();
+ ui64 statRowCount = WaitTableStats(*runtime, tabletId, [expectedRowCount](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetRowCount() >= expectedRowCount;
+ }).GetTableStats().GetRowCount();
UNIT_ASSERT_VALUES_EQUAL(statRowCount, expectedRowCount);
}
}
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp
index 8b845b0f4b..ff081be3e7 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp
@@ -214,7 +214,9 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, shard1, 1);
+ auto stats = WaitTableStats(runtime, shard1, [](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetPartCount() >= 1;
+ });
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 10);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
@@ -522,7 +524,9 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, shard1, 1);
+ auto stats = WaitTableStats(runtime, shard1, [](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetPartCount() >= 1;
+ });
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 10);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
@@ -591,7 +595,9 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, shard1, 1);
+ auto stats = WaitTableStats(runtime, shard1, [](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetPartCount() >= 1;
+ });
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), compactedPart);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
@@ -683,7 +689,9 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) {
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, shard1, 1);
+ auto stats = WaitTableStats(runtime, shard1, [](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetPartCount() >= 1;
+ });
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 10);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
diff --git a/ydb/core/tx/datashard/datashard_ut_stats.cpp b/ydb/core/tx/datashard/datashard_ut_stats.cpp
index 14a6fa25e3..2ce191382e 100644
--- a/ydb/core/tx/datashard/datashard_ut_stats.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_stats.cpp
@@ -21,6 +21,32 @@ namespace {
}
ExecSQL(server, sender, query);
}
+
+ std::function<bool(const NKikimrTableStats::TTableStats& stats)> HasPartCountCondition(ui64 count) {
+ return [count](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetPartCount() >= count;
+ };
+ }
+
+ std::function<bool(const NKikimrTableStats::TTableStats& stats)> HasRowCountCondition(ui64 count) {
+ return [count](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetRowCount() >= count;
+ };
+ }
+
+ std::function<bool(const NKikimrTableStats::TTableStats& stats)> HasSchemaChangesCondition() {
+ Cerr << "waiting for schema changes" << Endl;
+ return [](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetHasSchemaChanges();
+ };
+ }
+
+ std::function<bool(const NKikimrTableStats::TTableStats& stats)> DoesNotHaveSchemaChangesCondition() {
+ Cerr << "waiting for no schema changes" << Endl;
+ return [](const NKikimrTableStats::TTableStats& stats) {
+ return !stats.GetHasSchemaChanges();
+ };
+ }
}
Y_UNIT_TEST_SUITE(DataShardStats) {
@@ -80,7 +106,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, shard1, 1);
+ auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1));
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
@@ -144,7 +170,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, shard1, 1);
+ auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1));
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
@@ -201,7 +227,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, shard1, 1);
+ auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1));
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 2000);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
@@ -283,7 +309,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
{
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, shard1, 1);
+ auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1));
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 5);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
@@ -340,7 +366,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
CompactTable(runtime, shard1, tableId1, false);
Cerr << "... waiting for stats after compaction" << Endl;
- auto stats = WaitTableStats(runtime, shard1, 1, (batch + 1) * batchItems);
+ auto stats = WaitTableStats(runtime, shard1, HasRowCountCondition((batch + 1) * batchItems));
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), (batch + 1) * batchItems);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
@@ -403,7 +429,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5), (6, 6), (7, 7), (8, 8)");
{
Cerr << "... waiting for stats" << Endl;
- auto stats = WaitTableStats(runtime, shard1, 2);
+ auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(2));
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 7);
@@ -490,7 +516,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2);");
- {
+ {
Cerr << "... waiting leader stats" << Endl;
auto stats = WaitTableStats(runtime, shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
@@ -514,8 +540,199 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
UNIT_ASSERT_LE(stats.GetFollowerId(), 3);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRangeReadRows(), 2);
}
- }
+ }
+
+ Y_UNIT_TEST(HasSchemaChanges_BTreeIndex) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ auto [shards, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ ui64 shard1 = shards.at(0);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)");
+
+ CompactTable(runtime, shard1, tableId1, false);
+ {
+ auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1));
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetHasSchemaChanges(), false);
+ }
+
+ runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(true);
+ WaitTableStats(runtime, shard1, HasSchemaChangesCondition());
+ CompactTable(runtime, shard1, tableId1, false);
+ WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+
+ runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false);
+ // turn off doesn't trigger compaction:
+ WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+ WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+ // even after restart:
+ RebootTablet(runtime, shard1, sender);
+ WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+ }
+
+ Y_UNIT_TEST(HasSchemaChanges_ByKeyFilter) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ auto [shards, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ ui64 shard1 = shards.at(0);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)");
+
+ CompactTable(runtime, shard1, tableId1, false);
+
+ {
+ auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1));
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetHasSchemaChanges(), false);
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetByKeyFilterSize(), 0);
+ }
-} // Y_UNIT_TEST_SUITE(DataShardStats)
+ WaitTxNotification(server, sender,
+ AsyncSetEnableFilterByKey(server, "/Root", "table-1", true));
+ {
+ auto stats = WaitTableStats(runtime, shard1, HasSchemaChangesCondition());
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetByKeyFilterSize(), 0);
+ }
+ CompactTable(runtime, shard1, tableId1, false);
+ {
+ auto stats = WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+ UNIT_ASSERT_GT(stats.GetTableStats().GetByKeyFilterSize(), 0);
+ }
-} // namespace NKikimr
+ WaitTxNotification(server, sender,
+ AsyncSetEnableFilterByKey(server, "/Root", "table-1", false));
+ {
+ auto stats = WaitTableStats(runtime, shard1, HasSchemaChangesCondition());
+ UNIT_ASSERT_GT(stats.GetTableStats().GetByKeyFilterSize(), 0);
+ }
+ CompactTable(runtime, shard1, tableId1, false);
+ {
+ auto stats = WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetByKeyFilterSize(), 0);
+ }
+ }
+
+ Y_UNIT_TEST(HasSchemaChanges_Columns) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ auto [shards, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ ui64 shard1 = shards.at(0);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)");
+
+ CompactTable(runtime, shard1, tableId1, false);
+
+ {
+ auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1));
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetHasSchemaChanges(), false);
+ }
+
+ WaitTxNotification(server, sender,
+ AsyncAlterAddExtraColumn(server, "/Root", "table-1"));
+ WaitTableStats(runtime, shard1, HasSchemaChangesCondition());
+ CompactTable(runtime, shard1, tableId1, false);
+ WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+
+ WaitTxNotification(server, sender,
+ AsyncAlterDropColumn(server, "/Root", "table-1", "extra"));
+ WaitTableStats(runtime, shard1, HasSchemaChangesCondition());
+ CompactTable(runtime, shard1, tableId1, false);
+ WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+ }
+
+ Y_UNIT_TEST(HasSchemaChanges_Families) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .AddStoragePool("ssd")
+ .AddStoragePool("hdd");
+
+ TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ auto opts = TShardedTableOptions()
+ .Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}, {"value2", "Uint32", false, false}})
+ .Families({{.Name = "default", .LogPoolKind = "ssd", .SysLogPoolKind = "ssd", .DataPoolKind = "ssd"}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)");
+
+ WaitTxNotification(server, sender,
+ AsyncAlterAddExtraColumn(server, "/Root", "table-1"));
+
+ CompactTable(runtime, shard1, tableId1, false);
+ {
+ auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1));
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetHasSchemaChanges(), false);
+ }
+
+ WaitTxNotification(server, sender,
+ AsyncSetColumnFamily(server, "/Root", "table-1", "value2", {.Name = "hdd", .DataPoolKind = "hdd"}));
+ WaitTableStats(runtime, shard1, HasSchemaChangesCondition());
+ CompactTable(runtime, shard1, tableId1, false);
+ WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+
+ WaitTxNotification(server, sender,
+ AsyncSetColumnFamily(server, "/Root", "table-1", "extra", {.Name = "hdd", .DataPoolKind = "hdd"}));
+ WaitTableStats(runtime, shard1, HasSchemaChangesCondition());
+ CompactTable(runtime, shard1, tableId1, false);
+ WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+
+ WaitTxNotification(server, sender,
+ AsyncSetColumnFamily(server, "/Root", "table-1", "extra", {.Name = "default", .DataPoolKind = "ssd"}));
+ WaitTableStats(runtime, shard1, HasSchemaChangesCondition());
+ CompactTable(runtime, shard1, tableId1, false);
+ WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition());
+ }
+
+}
+
+}
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
index 17cdf0d6eb..b840ed0752 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
@@ -1657,6 +1657,47 @@ ui64 AsyncAlterDropColumn(
return RunSchemeTx(*server->GetRuntime(), std::move(request));
}
+ui64 AsyncSetEnableFilterByKey(
+ Tests::TServer::TPtr server,
+ const TString& workingDir,
+ const TString& name,
+ bool value)
+{
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterTable();
+ desc.SetName(name);
+ desc.MutablePartitionConfig()->SetEnableFilterByKey(value);
+
+ return RunSchemeTx(*server->GetRuntime(), std::move(request));
+}
+
+ui64 AsyncSetColumnFamily(
+ Tests::TServer::TPtr server,
+ const TString& workingDir,
+ const TString& name,
+ const TString& colName,
+ TShardedTableOptions::TFamily family)
+{
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterTable();
+ desc.SetName(name);
+
+ auto col = desc.AddColumns();
+ col->SetName(colName);
+ col->SetFamilyName(family.Name);
+
+ auto fam = desc.MutablePartitionConfig()->AddColumnFamilies();
+ if (family.Name) fam->SetName(family.Name);
+ if (family.LogPoolKind) fam->MutableStorageConfig()->MutableLog()->SetPreferredPoolKind(family.LogPoolKind);
+ if (family.SysLogPoolKind) fam->MutableStorageConfig()->MutableSysLog()->SetPreferredPoolKind(family.SysLogPoolKind);
+ if (family.DataPoolKind) fam->MutableStorageConfig()->MutableData()->SetPreferredPoolKind(family.DataPoolKind);
+ if (family.ExternalPoolKind) fam->MutableStorageConfig()->MutableExternal()->SetPreferredPoolKind(family.ExternalPoolKind);
+ if (family.DataThreshold) fam->MutableStorageConfig()->SetDataThreshold(family.DataThreshold);
+ if (family.ExternalThreshold) fam->MutableStorageConfig()->SetExternalThreshold(family.ExternalThreshold);
+
+ return RunSchemeTx(*server->GetRuntime(), std::move(request));
+}
+
ui64 AsyncAlterAndDisableShadow(
Tests::TServer::TPtr server,
const TString& workingDir,
@@ -1903,7 +1944,9 @@ void WaitTableStatsImpl(TTestActorRuntime& runtime,
UNIT_ASSERT(captured);
}
-NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 minRowReads, ui64 minRangeReadRows) {
+NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRuntime& runtime, ui64 datashardId,
+ std::function<bool(const NKikimrTableStats::TTableStats& stats)> condition)
+{
NKikimrTxDataShard::TEvPeriodicTableStats stats;
bool captured = false;
@@ -1914,10 +1957,10 @@ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRunti
if (!record.GetFollowerId())
return;
- if (record.GetDatashardId() != tabletId)
+ if (record.GetDatashardId() != datashardId)
return;
- if (record.GetTableStats().GetRowReads() < minRowReads || record.GetTableStats().GetRangeReadRows() < minRangeReadRows)
+ if (!condition(record.GetTableStats()))
return;
stats = record;
@@ -1929,7 +1972,9 @@ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRunti
}
-NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 minPartCount, ui64 minRows) {
+NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, ui64 datashardId,
+ std::function<bool(const NKikimrTableStats::TTableStats& stats)> condition)
+{
NKikimrTxDataShard::TEvPeriodicTableStats stats;
bool captured = false;
@@ -1940,10 +1985,10 @@ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runt
if (record.GetFollowerId())
return;
- if (record.GetDatashardId() != tabletId)
+ if (record.GetDatashardId() != datashardId)
return;
- if (record.GetTableStats().GetPartCount() < minPartCount || record.GetTableStats().GetRowCount() < minRows)
+ if (!condition(record.GetTableStats()))
return;
stats = record;
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
index 4aa4f18a01..fde8d57384 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
@@ -636,6 +636,19 @@ ui64 AsyncAlterDropColumn(
const TString& name,
const TString& colName);
+ui64 AsyncSetEnableFilterByKey(
+ Tests::TServer::TPtr server,
+ const TString& workingDir,
+ const TString& name,
+ bool value);
+
+ui64 AsyncSetColumnFamily(
+ Tests::TServer::TPtr server,
+ const TString& workingDir,
+ const TString& name,
+ const TString& colName,
+ TShardedTableOptions::TFamily family);
+
ui64 AsyncAlterAndDisableShadow(
Tests::TServer::TPtr server,
const TString& workingDir,
@@ -724,8 +737,10 @@ TString ReadShardedTable(
void WaitTxNotification(Tests::TServer::TPtr server, TActorId sender, ui64 txId);
void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId);
-NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 minPartCount = 0, ui64 minRows = 0);
-NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 minRowReads = 0, ui64 minRangeReadRows = 0);
+NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, ui64 datashardId,
+ std::function<bool(const NKikimrTableStats::TTableStats& stats)> condition = [](const NKikimrTableStats::TTableStats&)->bool{return true;});
+NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRuntime& runtime, ui64 datashardId,
+ std::function<bool(const NKikimrTableStats::TTableStats& stats)> condition = [](const NKikimrTableStats::TTableStats&)->bool{return true;});
void SimulateSleep(Tests::TServer::TPtr server, TDuration duration);
void SimulateSleep(TTestActorRuntime& runtime, TDuration duration);
diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h
index 5dd37e7cab..c17056ff8e 100644
--- a/ydb/core/tx/schemeshard/operation_queue_timer.h
+++ b/ydb/core/tx/schemeshard/operation_queue_timer.h
@@ -113,8 +113,8 @@ struct TShardCompactionInfo {
ui64 LastFullCompactionTs = 0;
ui64 RowCount = 0;
ui64 RowDeletes = 0;
-
ui64 PartCount = 0;
+ bool HasSchemaChanges = false;
explicit TShardCompactionInfo(const TShardIdx& id)
: ShardIdx(id)
@@ -127,6 +127,7 @@ struct TShardCompactionInfo {
, RowCount(stats.RowCount)
, RowDeletes(stats.RowDeletes)
, PartCount(stats.PartCount)
+ , HasSchemaChanges(stats.HasSchemaChanges)
{}
TShardCompactionInfo(const TShardCompactionInfo&) = default;
@@ -262,7 +263,8 @@ public:
// ignore single parted shard if needed
bool isSingleParted = info.PartCount == 1;
- if (!Config.CompactSinglePartedShards && isSingleParted)
+ bool hasSchemaChanges = info.HasSchemaChanges;
+ if (!Config.CompactSinglePartedShards && isSingleParted && !hasSchemaChanges)
return false;
if (info.RowCount < Config.RowCountThreshold)
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index e3cc67d2b6..ef02421c39 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -212,6 +212,7 @@ TPartitionStats TTxStoreTableStats::PrepareStats(const TActorContext& ctx,
newStats.FullCompactionTs = tableStats.GetLastFullCompactionTs();
newStats.MemDataSize = tableStats.GetInMemSize();
newStats.StartTime = TInstant::MilliSeconds(rec.GetStartTime());
+ newStats.HasSchemaChanges = tableStats.GetHasSchemaChanges();
newStats.HasLoanedData = tableStats.GetHasLoanedParts();
for (ui64 tabletId : rec.GetUserTablePartOwners()) {
newStats.PartOwners.insert(TTabletId(tabletId));
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index d3fdc89e67..8a04416a2b 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -278,6 +278,8 @@ struct TPartitionStats {
// True when lent parts to other tablets
bool HasLoanedData = false;
+ bool HasSchemaChanges = false;
+
// Tablet actor started at
TInstant StartTime;
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index 7ca17066cb..b11c249bf2 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -42,6 +42,7 @@ static void FillTableStats(NKikimrTableStats::TTableStats* stats, const TPartiti
stats->SetRangeReadRows(tableStats.RangeReadRows);
stats->SetPartCount(tableStats.PartCount);
+ stats->SetHasSchemaChanges(tableStats.HasSchemaChanges);
auto* storagePoolsStats = stats->MutableStoragePools()->MutablePoolsUsage();
for (const auto& [poolKind, stats] : tableStats.StoragePoolsStats) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index 0e8f3b31ea..efcf6a89e5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -377,7 +377,7 @@ struct Schema : NIceDb::Schema {
struct FullCompactionTs : Column<31, NScheme::NTypeIds::Uint64> { static constexpr ui64 Default = 0; };
struct MemDataSize : Column<32, NScheme::NTypeIds::Uint64> { static constexpr ui64 Default = 0; };
- // PartCount, PartOwners & ShardState are volatile data
+ // PartCount, PartOwners, ShardState, HasSchemaChanges are volatile data
// Represented by NKikimrTableStats::TStoragePoolsStats.
struct StoragePoolsStats : Column<33, NScheme::NTypeIds::String> { using Type = TString; };
diff --git a/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp
index 8297cc3b6f..bd90a89af2 100644
--- a/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp
+++ b/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp
@@ -659,7 +659,9 @@ Y_UNIT_TEST_SUITE(TStoragePoolsStatsPersistence) {
NKikimrTxDataShard::TEvCompactTableResult::OK
);
// we wait for at least 1 part count, because it signals that the stats have been recalculated after compaction
- WaitTableStats(runtime, datashard, 1, rowsCount).GetTableStats();
+ WaitTableStats(runtime, datashard, [](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetPartCount() >= 1;
+ });
auto checkUsage = [&poolsKinds](ui64 totalUsage, const auto& poolUsage) {
if (IsIn(poolsKinds, poolUsage.GetPoolKind())) {
diff --git a/ydb/services/ydb/ydb_ut.cpp b/ydb/services/ydb/ydb_ut.cpp
index 8dc54a48b4..c8ecc4e180 100644
--- a/ydb/services/ydb/ydb_ut.cpp
+++ b/ydb/services/ydb/ydb_ut.cpp
@@ -5700,7 +5700,9 @@ Y_UNIT_TEST(DisableWritesToDatabase) {
// try upsert when the feature flag is enabled
{
runtime.GetAppData().FeatureFlags.SetEnableSeparateDiskSpaceQuotas(true);
- WaitTableStats(runtime, datashard, 1);
+ WaitTableStats(runtime, datashard, [](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetPartCount() >= 1;
+ });
upsert(table, "2u, \"Bar\"", Ydb::StatusIds::UNAVAILABLE);
checkDatabaseState(tenantPath, true);
}
@@ -5708,7 +5710,9 @@ Y_UNIT_TEST(DisableWritesToDatabase) {
// try upsert when the feature flag is disabled
{
runtime.GetAppData().FeatureFlags.SetEnableSeparateDiskSpaceQuotas(false);
- WaitTableStats(runtime, datashard, 1);
+ WaitTableStats(runtime, datashard, [](const NKikimrTableStats::TTableStats& stats) {
+ return stats.GetPartCount() >= 1;
+ });
upsert(table, "2u, \"Bar\"");
checkDatabaseState(tenantPath, false);
}