diff options
author | kungurtsev <kungasc@ydb.tech> | 2024-11-12 13:02:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-12 15:02:43 +0300 |
commit | edeb17d54b12a0a225e43f890a31f18208560c0c (patch) | |
tree | 4e79ea762e365c41ed6c0064770fe01025a6b176 | |
parent | 91cc64219e78a1b47be64f2b815027c1fcdd3f1c (diff) | |
download | ydb-edeb17d54b12a0a225e43f890a31f18208560c0c.tar.gz |
Request DataShard compaction if scheme has been changeed (#11147)
25 files changed, 527 insertions, 149 deletions
diff --git a/ydb/core/protos/table_stats.proto b/ydb/core/protos/table_stats.proto index c80e972335..3958b1b221 100644 --- a/ydb/core/protos/table_stats.proto +++ b/ydb/core/protos/table_stats.proto @@ -68,4 +68,8 @@ message TTableStats { optional TStoragePoolsStats StoragePools = 31; optional uint64 ByKeyFilterSize = 32; + + // denotes that datashard should be background compacted + // even if it is single parted + optional bool HasSchemaChanges = 33; } diff --git a/ydb/core/tablet_flat/flat_comp.h b/ydb/core/tablet_flat/flat_comp.h index 5b49992129..923624d041 100644 --- a/ydb/core/tablet_flat/flat_comp.h +++ b/ydb/core/tablet_flat/flat_comp.h @@ -166,7 +166,7 @@ namespace NTable { /** * Returns row schema of the specified table */ - virtual TIntrusiveConstPtr<TRowScheme> RowScheme(ui32 table) = 0; + virtual TIntrusiveConstPtr<TRowScheme> RowScheme(ui32 table) const = 0; /** * Returns schema of the specified table diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index bd06779142..ffa0d56b43 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -4293,7 +4293,7 @@ const NTable::TScheme& TExecutor::DatabaseScheme() return Scheme(); } -TIntrusiveConstPtr<NTable::TRowScheme> TExecutor::RowScheme(ui32 table) +TIntrusiveConstPtr<NTable::TRowScheme> TExecutor::RowScheme(ui32 table) const { return Database->GetRowScheme(table); } @@ -4334,6 +4334,80 @@ const NTable::TRowVersionRanges& TExecutor::TableRemovedRowVersions(ui32 table) return Database->GetRemovedRowVersions(table); } +bool TExecutor::HasSchemaChanges(ui32 table) const { + auto *tableInfo = Scheme().GetTableInfo(table); + auto rowScheme = RowScheme(table); + if (!tableInfo || !rowScheme) { + return false; + } + + auto subset = Database->Subset(table, NTable::TEpoch::Max(), { } , { }); + for (const auto& partView : subset->Flatten) { + if (HasSchemaChanges(partView, *tableInfo, *rowScheme)) { + return true; + } + } + + return false; +} + +bool TExecutor::HasSchemaChanges(const NTable::TPartView& partView, const NTable::TScheme::TTableInfo& tableInfo, const NTable::TRowScheme& rowScheme) const { + if (partView.Part->Stat.Rows == 0) { + return false; + } + + { // Check by key filter existence + bool partByKeyFilter = bool(partView->ByKey); + bool schemeByKeyFilter = tableInfo.ByKeyFilter; + if (partByKeyFilter != schemeByKeyFilter) { + return true; + } + } + + { // Check B-Tree index existence + if (AppData()->FeatureFlags.GetEnableLocalDBBtreeIndex() && !partView->IndexPages.HasBTree()) { + return true; + } + } + + { // Check families + size_t partFamiliesCount = partView->GroupsCount; + size_t schemeFamiliesCount = rowScheme.Families.size(); + if (partFamiliesCount != schemeFamiliesCount) { + return true; + } + + for (size_t index : xrange(rowScheme.Families.size())) { + auto familyId = rowScheme.Families[index]; + static const NTable::TScheme::TFamily defaultFamilySettings; + const auto& family = tableInfo.Families.ValueRef(familyId, defaultFamilySettings); // Workaround for KIKIMR-17222 + + const auto* schemeGroupRoom = tableInfo.Rooms.FindPtr(family.Room); + Y_ABORT_UNLESS(schemeGroupRoom, "Cannot find room %" PRIu32 " in table %" PRIu32, family.Room, tableInfo.Id); + + ui32 partGroupChannel = partView.Part->GetGroupChannel(NTable::NPage::TGroupId(index)); + if (partGroupChannel != schemeGroupRoom->Main) { + return true; + } + } + } + + { // Check columns + THashMap<NTable::TTag, ui32> partColumnGroups, schemeColumnGroups; + for (const auto& column : partView->Scheme->AllColumns) { + partColumnGroups[column.Tag] = column.Group; + } + for (const auto& col : rowScheme.Cols) { + schemeColumnGroups[col.Tag] = col.Group; + } + if (partColumnGroups != schemeColumnGroups) { + return true; + } + } + + return false; +} + ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params) { if (auto logl = Logger->Log(ELnLev::Info)) @@ -4379,37 +4453,29 @@ ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params) for (size_t group : xrange(rowScheme->Families.size())) { auto familyId = rowScheme->Families[group]; - const auto* family = tableInfo->Families.FindPtr(familyId); - if (Y_UNLIKELY(!family)) { - // FIXME: workaround for KIKIMR-17222 - // Column families with default settings may be missing in schema, - // so we have to use a static variable as a substitute - static const NTable::TScheme::TFamily defaultFamilySettings; - family = &defaultFamilySettings; - } - Y_ABORT_UNLESS(family, "Cannot find family %" PRIu32 " in table %" PRIu32, familyId, table); + static const NTable::TScheme::TFamily defaultFamilySettings; + const auto& family = tableInfo->Families.ValueRef(familyId, defaultFamilySettings); // Workaround for KIKIMR-17222 - auto roomId = family->Room; - auto* room = tableInfo->Rooms.FindPtr(roomId); - Y_ABORT_UNLESS(room, "Cannot find room %" PRIu32 " in table %" PRIu32, roomId, table); + auto* room = tableInfo->Rooms.FindPtr(family.Room); + Y_ABORT_UNLESS(room, "Cannot find room %" PRIu32 " in table %" PRIu32, family.Room, table); auto& pageGroup = comp->Layout.Groups.at(group); auto& writeGroup = comp->Writer.Groups.at(group); - pageGroup.Codec = family->Codec; + pageGroup.Codec = family.Codec; pageGroup.PageSize = policy->MinDataPageSize; pageGroup.BTreeIndexNodeTargetSize = policy->MinBTreeIndexNodeSize; pageGroup.BTreeIndexNodeKeysMin = policy->MinBTreeIndexNodeKeys; - writeGroup.Cache = Max(family->Cache, cache); + writeGroup.Cache = Max(family.Cache, cache); writeGroup.MaxBlobSize = NBlockIO::BlockSize; writeGroup.Channel = room->Main; addChannel(room->Main); if (group == 0) { // Small/Large edges are taken from the leader family - comp->Layout.SmallEdge = family->Small; - comp->Layout.LargeEdge = family->Large; + comp->Layout.SmallEdge = family.Small; + comp->Layout.LargeEdge = family.Large; // Small/Large channels are taken from the leader family comp->Writer.BlobsChannels = room->Blobs; diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 7c08925a47..5369017fe3 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -591,7 +591,7 @@ class TExecutor ui64 OwnerTabletId() const override; const NTable::TScheme& DatabaseScheme() override; - TIntrusiveConstPtr<NTable::TRowScheme> RowScheme(ui32 table) override; + TIntrusiveConstPtr<NTable::TRowScheme> RowScheme(ui32 table) const override; const NTable::TScheme::TTableInfo* TableScheme(ui32 table) override; ui64 TableMemSize(ui32 table, NTable::TEpoch epoch) override; NTable::TPartView TablePart(ui32 table, const TLogoBlobID& label) override; @@ -652,6 +652,8 @@ public: bool CancelScan(ui32 tableId, ui64 taskId) override; TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 tableId) const override; + bool HasSchemaChanges(ui32 table) const override; + bool HasSchemaChanges(const NTable::TPartView& partView, const NTable::TScheme::TTableInfo& tableInfo, const NTable::TRowScheme& rowScheme) const; ui64 CompactBorrowed(ui32 tableId) override; ui64 CompactMemTable(ui32 tableId) override; ui64 CompactTable(ui32 tableId) override; diff --git a/ydb/core/tablet_flat/flat_stat_table.cpp b/ydb/core/tablet_flat/flat_stat_table.cpp index afacbe7f37..5f74a4319f 100644 --- a/ydb/core/tablet_flat/flat_stat_table.cpp +++ b/ydb/core/tablet_flat/flat_stat_table.cpp @@ -23,14 +23,14 @@ bool BuildStats(const TSubset& subset, TStats& stats, ui64 rowCountResolution, u } void GetPartOwners(const TSubset& subset, THashSet<ui64>& partOwners) { - for (auto& pi : subset.Flatten) { - partOwners.insert(pi->Label.TabletID()); + for (const auto& partView : subset.Flatten) { + partOwners.insert(partView->Label.TabletID()); } - for (auto& pi : subset.ColdParts) { - partOwners.insert(pi->Label.TabletID()); + for (const auto& coldPart : subset.ColdParts) { + partOwners.insert(coldPart->Label.TabletID()); } - for (auto& pi : subset.TxStatus) { - partOwners.insert(pi->Label.TabletID()); + for (const auto& txStatus : subset.TxStatus) { + partOwners.insert(txStatus->Label.TabletID()); } } diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index fd210f3bca..8475ea7529 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -1398,13 +1398,12 @@ bool TTable::RemoveRowVersions(const TRowVersion& lower, const TRowVersion& uppe TCompactionStats TTable::GetCompactionStats() const { - TCompactionStats stats; - stats.MemRowCount = GetMemRowCount(); - stats.MemDataSize = GetMemSize(); - stats.MemDataWaste = GetMemWaste(); - stats.PartCount = Flatten.size() + ColdParts.size(); - - return stats; + return { + .PartCount = Flatten.size() + ColdParts.size(), + .MemRowCount = GetMemRowCount(), + .MemDataSize = GetMemSize(), + .MemDataWaste = GetMemWaste(), + }; } void TTable::SetTableObserver(TIntrusivePtr<ITableObserver> ptr) noexcept diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index e56860067f..eb6ae0abcf 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -585,6 +585,7 @@ namespace NFlatExecutorSetup { // edge and ts of last full compaction virtual TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 tableId) const = 0; + virtual bool HasSchemaChanges(ui32 table) const = 0; // Forces full compaction of the specified table in the near future // Returns 0 if can't compact, otherwise compaction ID diff --git a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h index 8d466bd15b..25fefd79b4 100644 --- a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h +++ b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h @@ -47,7 +47,7 @@ public: return DB.GetScheme(); } - TIntrusiveConstPtr<NKikimr::NTable::TRowScheme> RowScheme(ui32 table) override { + TIntrusiveConstPtr<NKikimr::NTable::TRowScheme> RowScheme(ui32 table) const override { return DB.GetRowScheme(table); } diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 6771f4517e..a8c90003f0 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -138,9 +138,9 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , MaxTxLagMilliseconds(5*60*1000, 0, 30*24*3600*1000ll) , CanCancelROWithReadSets(0, 0, 1) , PerShardReadSizeLimit(5368709120, 0, 107374182400) - , CpuUsageReportThreshlodPercent(60, -1, 146) + , CpuUsageReportThresholdPercent(60, -1, 146) , CpuUsageReportIntervalSeconds(60, 0, 365*86400) - , HighDataSizeReportThreshlodBytes(10ull<<30, -1, Max<i64>()) + , HighDataSizeReportThresholdBytes(10ull<<30, -1, Max<i64>()) , HighDataSizeReportIntervalSeconds(60, 0, 365*86400) , DataTxProfileLogThresholdMs(0, 0, 86400000) , DataTxProfileBufferThresholdMs(0, 0, 86400000) @@ -308,9 +308,9 @@ void TDataShard::IcbRegister() { appData->Icb->RegisterSharedControl(CanCancelROWithReadSets, "DataShardControls.CanCancelROWithReadSets"); appData->Icb->RegisterSharedControl(PerShardReadSizeLimit, "TxLimitControls.PerShardReadSizeLimit"); - appData->Icb->RegisterSharedControl(CpuUsageReportThreshlodPercent, "DataShardControls.CpuUsageReportThreshlodPercent"); + appData->Icb->RegisterSharedControl(CpuUsageReportThresholdPercent, "DataShardControls.CpuUsageReportThreshlodPercent"); appData->Icb->RegisterSharedControl(CpuUsageReportIntervalSeconds, "DataShardControls.CpuUsageReportIntervalSeconds"); - appData->Icb->RegisterSharedControl(HighDataSizeReportThreshlodBytes, "DataShardControls.HighDataSizeReportThreshlodBytes"); + appData->Icb->RegisterSharedControl(HighDataSizeReportThresholdBytes, "DataShardControls.HighDataSizeReportThreshlodBytes"); appData->Icb->RegisterSharedControl(HighDataSizeReportIntervalSeconds, "DataShardControls.HighDataSizeReportIntervalSeconds"); appData->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo"); diff --git a/ydb/core/tx/datashard/datashard__compaction.cpp b/ydb/core/tx/datashard/datashard__compaction.cpp index eefd431321..be8842c637 100644 --- a/ydb/core/tx/datashard/datashard__compaction.cpp +++ b/ydb/core/tx/datashard/datashard__compaction.cpp @@ -108,7 +108,8 @@ public: auto stats = txc.DB.GetCompactionStats(localTid); bool isEmpty = stats.PartCount == 0 && stats.MemDataSize == 0; bool isSingleParted = stats.PartCount == 1 && stats.MemDataSize == 0; - if (isEmpty || isSingleParted && !hasBorrowed && !record.HasCompactSinglePartedShards()) { + bool hasSchemaChanges = Self->Executor()->HasSchemaChanges(tableInfo.LocalTid); + if (isEmpty || isSingleParted && !hasBorrowed && !hasSchemaChanges && !record.GetCompactSinglePartedShards()) { // nothing to compact LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Background compaction of tablet# " << Self->TabletID() diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp index 4246a54e32..f92acafecf 100644 --- a/ydb/core/tx/datashard/datashard__stats.cpp +++ b/ydb/core/tx/datashard/datashard__stats.cpp @@ -15,7 +15,24 @@ namespace NDataShard { using namespace NResourceBroker; using namespace NTable; -class TTableStatsCoroBuilder : public TActorCoroImpl, private IPages { +struct TTableStatsCoroBuilderArgs { + TActorId ReplyTo; + ui64 TabletId; + ui64 TableId; + TActorId ExecutorId; + ui64 IndexSize; + TAutoPtr<TSubset> Subset; + ui64 MemRowCount; + ui64 MemDataSize; + ui64 RowCountResolution; + ui64 DataSizeResolution; + ui32 HistogramBucketsCount; + ui64 SearchHeight; + bool HasSchemaChanges; + TInstant StatsUpdateTime; +}; + +class TTableStatsCoroBuilder : public TActorCoroImpl, private IPages, TTableStatsCoroBuilderArgs { private: using ECode = TDataShard::TEvPrivate::TEvTableStatsError::ECode; @@ -40,23 +57,9 @@ private: }; public: - TTableStatsCoroBuilder(TActorId replyTo, ui64 tabletId, ui64 tableId, TActorId executorId, ui64 indexSize, - const TAutoPtr<TSubset> subset, ui64 memRowCount, ui64 memDataSize, - ui64 rowCountResolution, ui64 dataSizeResolution, ui32 histogramBucketsCount, ui64 searchHeight, TInstant statsUpdateTime) + TTableStatsCoroBuilder(TTableStatsCoroBuilderArgs args) : TActorCoroImpl(/* stackSize */ 64_KB, /* allowUnhandledDtor */ true) - , ReplyTo(replyTo) - , TabletId(tabletId) - , TableId(tableId) - , ExecutorId(executorId) - , IndexSize(indexSize) - , StatsUpdateTime(statsUpdateTime) - , Subset(subset) - , MemRowCount(memRowCount) - , MemDataSize(memDataSize) - , RowCountResolution(rowCountResolution) - , DataSizeResolution(dataSizeResolution) - , HistogramBucketsCount(histogramBucketsCount) - , SearchHeight(searchHeight) + , TTableStatsCoroBuilderArgs(args) {} void Run() override { @@ -138,6 +141,7 @@ private: ev->MemRowCount = MemRowCount; ev->MemDataSize = MemDataSize; ev->SearchHeight = SearchHeight; + ev->HasSchemaChanges = HasSchemaChanges; GetPartOwners(*Subset, ev->PartOwners); @@ -166,6 +170,7 @@ private: LOG_DEBUG_S(GetActorContext(), NKikimrServices::TX_DATASHARD, "BuildStats result at datashard " << TabletId << ", for tableId " << TableId << ": RowCount " << ev->Stats.RowCount << ", DataSize " << ev->Stats.DataSize.Size << ", IndexSize " << ev->Stats.IndexSize.Size << ", PartCount " << ev->PartCount << (ev->PartOwners.size() > 1 || ev->PartOwners.size() == 1 && *ev->PartOwners.begin() != TabletId ? ", with borrowed parts" : "") + << (ev->HasSchemaChanges ? ", with schema changes" : "") << ", LoadedSize " << PagesSize << ", " << NFmt::Do(*Spent) << ", HistogramKeys " << ev->Stats.DataSizeHistogram.size()); Send(ReplyTo, ev.Release()); @@ -223,19 +228,6 @@ private: Send(MakeResourceBrokerID(), new TEvResourceBroker::TEvFinishTask(/* task id */ 1, /* cancelled */ false)); } - TActorId ReplyTo; - ui64 TabletId; - ui64 TableId; - TActorId ExecutorId; - ui64 IndexSize; - TInstant StatsUpdateTime; - TAutoPtr<TSubset> Subset; - ui64 MemRowCount; - ui64 MemDataSize; - ui64 RowCountResolution; - ui64 DataSizeResolution; - ui32 HistogramBucketsCount; - ui64 SearchHeight; THashMap<const TPart*, THashMap<TPageId, TSharedData>> Pages; ui64 PagesSize = 0; ui64 CoroutineDeadline; @@ -308,6 +300,7 @@ public: Result->Record.MutableTableStats()->SetPartCount(tableInfo.Stats.PartCount); Result->Record.MutableTableStats()->SetSearchHeight(tableInfo.Stats.SearchHeight); + Result->Record.MutableTableStats()->SetHasSchemaChanges(tableInfo.Stats.HasSchemaChanges); Result->Record.MutableTableStats()->SetLastFullCompactionTs(tableInfo.Stats.LastFullCompaction.Seconds()); Result->Record.MutableTableStats()->SetHasLoanedParts(Self->Executor()->HasLoanedParts()); @@ -363,38 +356,36 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorCo Actors.erase(ev->Sender); ui64 tableId = ev->Get()->TableId; - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "BuildStats result received at datashard " << TabletID() << ", for tableId " << tableId); - i64 dataSize = 0; - if (TableInfos.contains(tableId)) { - const TUserTable& tableInfo = *TableInfos[tableId]; + if (!TableInfos.contains(tableId)) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "BuildStats result dropped at datashard " << TabletID() + << ", built for tableId " << tableId << ", but table is gone (moved ot dropped)"); + return; + } - if (!tableInfo.StatsUpdateInProgress) { - // How can this happen? - LOG_ERROR(ctx, NKikimrServices::TX_DATASHARD, - "Unexpected async stats update at datashard %" PRIu64, TabletID()); - } - tableInfo.Stats.DataStats = std::move(ev->Get()->Stats); - tableInfo.Stats.PartOwners = std::move(ev->Get()->PartOwners); - tableInfo.Stats.PartCount = ev->Get()->PartCount; - tableInfo.Stats.StatsUpdateTime = ev->Get()->StatsUpdateTime; - tableInfo.Stats.MemRowCount = ev->Get()->MemRowCount; - tableInfo.Stats.MemDataSize = ev->Get()->MemDataSize; + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "BuildStats result received at datashard " << TabletID() + << ", for tableId " << tableId); - dataSize += tableInfo.Stats.DataStats.DataSize.Size; + const TUserTable& tableInfo = *TableInfos[tableId]; - tableInfo.Stats.SearchHeight = ev->Get()->SearchHeight; + if (!tableInfo.StatsUpdateInProgress) { // how can this happen? + LOG_ERROR(ctx, NKikimrServices::TX_DATASHARD, "Unexpected async stats update at datashard %" PRIu64, TabletID()); + } - tableInfo.StatsUpdateInProgress = false; + tableInfo.Stats.DataStats = std::move(ev->Get()->Stats); + tableInfo.Stats.PartOwners = std::move(ev->Get()->PartOwners); + tableInfo.Stats.PartCount = ev->Get()->PartCount; + tableInfo.Stats.StatsUpdateTime = ev->Get()->StatsUpdateTime; + tableInfo.Stats.MemRowCount = ev->Get()->MemRowCount; + tableInfo.Stats.MemDataSize = ev->Get()->MemDataSize; + tableInfo.Stats.SearchHeight = ev->Get()->SearchHeight; + tableInfo.Stats.HasSchemaChanges = ev->Get()->HasSchemaChanges; - SendPeriodicTableStats(ctx); + tableInfo.StatsUpdateInProgress = false; - } else { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Drop stats at datashard " << TabletID() - << ", built for tableId " << tableId << ", but table is gone (moved ot dropped)"); - } + SendPeriodicTableStats(ctx); - if (dataSize > HighDataSizeReportThreshlodBytes) { + if (static_cast<i64>(tableInfo.Stats.DataStats.DataSize.Size) > HighDataSizeReportThresholdBytes) { TInstant now = AppData(ctx)->TimeProvider->Now(); if (LastDataSizeWarnTime + TDuration::Seconds(HighDataSizeReportIntervalSeconds) > now) @@ -405,11 +396,11 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorCo TStringBuilder names; ListTableNames(GetUserTables(), names); - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Data size " << dataSize - << " is higher than threshold of " << (i64)HighDataSizeReportThreshlodBytes - << " at datashard: " << TabletID() - << " table: " << names - << " consider reconfiguring table partitioning settings"); + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Data size " << tableInfo.Stats.DataStats.DataSize.Size + << " is higher than threshold of " << (i64)HighDataSizeReportThresholdBytes + << " at datashard: " << TabletID() + << " table: " << names + << " consider reconfiguring table partitioning settings"); } } @@ -470,6 +461,7 @@ public: for (auto& ti : Self->TableInfos) { const ui32 localTableId = ti.second->LocalTid; const ui32 shadowTableId = ti.second->ShadowTid; + const ui64 tableId = ti.first; CheckIdleMemCompaction(*ti.second, txc, ctx); @@ -488,21 +480,28 @@ public: searchHeight = 0; } + // Note: ignore shadow table for simplicity + bool hasSchemaChanges = Self->Executor()->HasSchemaChanges(localTableId); + if (!ti.second->StatsNeedUpdate) { - ti.second->Stats.MemRowCount = memRowCount; - ti.second->Stats.MemDataSize = memDataSize; - ti.second->Stats.SearchHeight = searchHeight; + auto& stats = ti.second->Stats; + stats.MemRowCount = memRowCount; + stats.MemDataSize = memDataSize; + stats.SearchHeight = searchHeight; + stats.HasSchemaChanges = hasSchemaChanges; + + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "BuildStats skipped at datashard " << Self->TabletID() << ", for tableId " << tableId + << ": RowCount " << stats.DataStats.RowCount << ", DataSize " << stats.DataStats.DataSize.Size << ", IndexSize " << stats.DataStats.IndexSize.Size << ", PartCount " << stats.PartCount + << (stats.HasSchemaChanges ? ", with schema changes" : "")); + continue; } const ui32 MaxBuckets = 500; - - ui64 tableId = ti.first; ui64 rowCountResolution = gDbStatsRowCountResolution; ui64 dataSizeResolution = gDbStatsDataSizeResolution; ui32 histogramBucketsCount = gDbStatsHistogramBucketsCount; - if (ti.second->Stats.DataSizeResolution && ti.second->Stats.DataStats.DataSize.Size / ti.second->Stats.DataSizeResolution <= MaxBuckets) { @@ -547,19 +546,23 @@ public: shadowSubset->ColdParts.end()); } - auto builder = new TActorCoro(MakeHolder<TTableStatsCoroBuilder>(ctx.SelfID, - Self->TabletID(), - tableId, - Self->ExecutorID(), - indexSize, - subsetForStats, - memRowCount, - memDataSize, - rowCountResolution, - dataSizeResolution, - histogramBucketsCount, - searchHeight, - AppData(ctx)->TimeProvider->Now()), NKikimrServices::TActivity::DATASHARD_STATS_BUILDER); + auto builder = new TActorCoro(MakeHolder<TTableStatsCoroBuilder>( + TTableStatsCoroBuilderArgs{ + .ReplyTo = ctx.SelfID, + .TabletId = Self->TabletID(), + .TableId = tableId, + .ExecutorId = Self->ExecutorID(), + .IndexSize = indexSize, + .Subset = subsetForStats, + .MemRowCount = memRowCount, + .MemDataSize = memDataSize, + .RowCountResolution = rowCountResolution, + .DataSizeResolution = dataSizeResolution, + .HistogramBucketsCount = histogramBucketsCount, + .SearchHeight = searchHeight, + .HasSchemaChanges = hasSchemaChanges, + .StatsUpdateTime = AppData(ctx)->TimeProvider->Now() + }), NKikimrServices::TActivity::DATASHARD_STATS_BUILDER); TActorId actorId = ctx.Register(builder, TMailboxType::HTSwap, AppData(ctx)->BatchPoolId); Self->Actors.insert(actorId); @@ -614,7 +617,7 @@ void TDataShard::CollectCpuUsage(const TActorContext &ctx) { ui64 cpuUsec = metrics->CPU.GetValue(); float cpuPercent = cpuUsec / 10000.0; - if (cpuPercent > CpuUsageReportThreshlodPercent) { + if (cpuPercent > CpuUsageReportThresholdPercent) { if (LastCpuWarnTime + TDuration::Seconds(CpuUsageReportIntervalSeconds) > now) return; @@ -624,7 +627,7 @@ void TDataShard::CollectCpuUsage(const TActorContext &ctx) { ListTableNames(GetUserTables(), names); LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "CPU usage " << cpuPercent - << "% is higher than threshold of " << (i64)CpuUsageReportThreshlodPercent + << "% is higher than threshold of " << (i64)CpuUsageReportThresholdPercent << "% in-flight Tx: " << TxInFly() << " immediate Tx: " << ImmediateInFly() << " readIterators: " << ReadIteratorsInFly() diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index a5c275c22f..aab59d0742 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -387,6 +387,7 @@ class TDataShard ui64 MemRowCount = 0; ui64 MemDataSize = 0; ui64 SearchHeight = 0; + bool HasSchemaChanges = false; }; struct TEvTableStatsError : public TEventLocal<TEvTableStatsError, EvTableStatsError> { @@ -2726,9 +2727,9 @@ private: TControlWrapper MaxTxLagMilliseconds; TControlWrapper CanCancelROWithReadSets; TControlWrapper PerShardReadSizeLimit; - TControlWrapper CpuUsageReportThreshlodPercent; + TControlWrapper CpuUsageReportThresholdPercent; TControlWrapper CpuUsageReportIntervalSeconds; - TControlWrapper HighDataSizeReportThreshlodBytes; + TControlWrapper HighDataSizeReportThresholdBytes; TControlWrapper HighDataSizeReportIntervalSeconds; TControlWrapper DataTxProfileLogThresholdMs; @@ -3299,6 +3300,7 @@ protected: ev->Record.MutableTableStats()->SetPartCount(ti.Stats.PartCount); ev->Record.MutableTableStats()->SetSearchHeight(ti.Stats.SearchHeight); + ev->Record.MutableTableStats()->SetHasSchemaChanges(ti.Stats.HasSchemaChanges); ev->Record.MutableTableStats()->SetLastFullCompactionTs(ti.Stats.LastFullCompaction.Seconds()); ev->Record.MutableTableStats()->SetHasLoanedParts(Executor()->HasLoanedParts()); diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 2feea92c97..9845ef7f8d 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -394,6 +394,7 @@ struct TUserTable : public TThrRefBase { THashSet<ui64> PartOwners; ui64 PartCount = 0; ui64 SearchHeight = 0; + bool HasSchemaChanges = false; TInstant StatsUpdateTime; ui64 DataSizeResolution = 0; ui64 RowCountResolution = 0; diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 7384a1f43a..5ac95e737f 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -3591,7 +3591,9 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { if (Commit) expectedRowCount += writeCount * rowCount; - ui64 statRowCount = WaitTableStats(*runtime, tabletId, 0, expectedRowCount).GetTableStats().GetRowCount(); + ui64 statRowCount = WaitTableStats(*runtime, tabletId, [expectedRowCount](const NKikimrTableStats::TTableStats& stats) { + return stats.GetRowCount() >= expectedRowCount; + }).GetTableStats().GetRowCount(); UNIT_ASSERT_VALUES_EQUAL(statRowCount, expectedRowCount); } } diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp index 8b845b0f4b..ff081be3e7 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator_ext_blobs.cpp @@ -214,7 +214,9 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, shard1, 1); + auto stats = WaitTableStats(runtime, shard1, [](const NKikimrTableStats::TTableStats& stats) { + return stats.GetPartCount() >= 1; + }); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 10); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); @@ -522,7 +524,9 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, shard1, 1); + auto stats = WaitTableStats(runtime, shard1, [](const NKikimrTableStats::TTableStats& stats) { + return stats.GetPartCount() >= 1; + }); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 10); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); @@ -591,7 +595,9 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, shard1, 1); + auto stats = WaitTableStats(runtime, shard1, [](const NKikimrTableStats::TTableStats& stats) { + return stats.GetPartCount() >= 1; + }); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), compactedPart); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); @@ -683,7 +689,9 @@ Y_UNIT_TEST_SUITE(ReadIteratorExternalBlobs) { { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, shard1, 1); + auto stats = WaitTableStats(runtime, shard1, [](const NKikimrTableStats::TTableStats& stats) { + return stats.GetPartCount() >= 1; + }); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 10); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); diff --git a/ydb/core/tx/datashard/datashard_ut_stats.cpp b/ydb/core/tx/datashard/datashard_ut_stats.cpp index 14a6fa25e3..2ce191382e 100644 --- a/ydb/core/tx/datashard/datashard_ut_stats.cpp +++ b/ydb/core/tx/datashard/datashard_ut_stats.cpp @@ -21,6 +21,32 @@ namespace { } ExecSQL(server, sender, query); } + + std::function<bool(const NKikimrTableStats::TTableStats& stats)> HasPartCountCondition(ui64 count) { + return [count](const NKikimrTableStats::TTableStats& stats) { + return stats.GetPartCount() >= count; + }; + } + + std::function<bool(const NKikimrTableStats::TTableStats& stats)> HasRowCountCondition(ui64 count) { + return [count](const NKikimrTableStats::TTableStats& stats) { + return stats.GetRowCount() >= count; + }; + } + + std::function<bool(const NKikimrTableStats::TTableStats& stats)> HasSchemaChangesCondition() { + Cerr << "waiting for schema changes" << Endl; + return [](const NKikimrTableStats::TTableStats& stats) { + return stats.GetHasSchemaChanges(); + }; + } + + std::function<bool(const NKikimrTableStats::TTableStats& stats)> DoesNotHaveSchemaChangesCondition() { + Cerr << "waiting for no schema changes" << Endl; + return [](const NKikimrTableStats::TTableStats& stats) { + return !stats.GetHasSchemaChanges(); + }; + } } Y_UNIT_TEST_SUITE(DataShardStats) { @@ -80,7 +106,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) { { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, shard1, 1); + auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1)); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); @@ -144,7 +170,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) { { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, shard1, 1); + auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1)); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); @@ -201,7 +227,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) { { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, shard1, 1); + auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1)); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 2000); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); @@ -283,7 +309,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) { { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, shard1, 1); + auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1)); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 5); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); @@ -340,7 +366,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) { CompactTable(runtime, shard1, tableId1, false); Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, shard1, 1, (batch + 1) * batchItems); + auto stats = WaitTableStats(runtime, shard1, HasRowCountCondition((batch + 1) * batchItems)); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), (batch + 1) * batchItems); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); @@ -403,7 +429,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) { ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5), (6, 6), (7, 7), (8, 8)"); { Cerr << "... waiting for stats" << Endl; - auto stats = WaitTableStats(runtime, shard1, 2); + auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(2)); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 2); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 7); @@ -490,7 +516,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) { ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2);"); - { + { Cerr << "... waiting leader stats" << Endl; auto stats = WaitTableStats(runtime, shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); @@ -514,8 +540,199 @@ Y_UNIT_TEST_SUITE(DataShardStats) { UNIT_ASSERT_LE(stats.GetFollowerId(), 3); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRangeReadRows(), 2); } - } + } + + Y_UNIT_TEST(HasSchemaChanges_BTreeIndex) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + auto [shards, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1); + ui64 shard1 = shards.at(0); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)"); + + CompactTable(runtime, shard1, tableId1, false); + { + auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1)); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetHasSchemaChanges(), false); + } + + runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(true); + WaitTableStats(runtime, shard1, HasSchemaChangesCondition()); + CompactTable(runtime, shard1, tableId1, false); + WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + + runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false); + // turn off doesn't trigger compaction: + WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + // even after restart: + RebootTablet(runtime, shard1, sender); + WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + } + + Y_UNIT_TEST(HasSchemaChanges_ByKeyFilter) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + auto [shards, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1); + ui64 shard1 = shards.at(0); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)"); + + CompactTable(runtime, shard1, tableId1, false); + + { + auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1)); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetHasSchemaChanges(), false); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetByKeyFilterSize(), 0); + } -} // Y_UNIT_TEST_SUITE(DataShardStats) + WaitTxNotification(server, sender, + AsyncSetEnableFilterByKey(server, "/Root", "table-1", true)); + { + auto stats = WaitTableStats(runtime, shard1, HasSchemaChangesCondition()); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetByKeyFilterSize(), 0); + } + CompactTable(runtime, shard1, tableId1, false); + { + auto stats = WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + UNIT_ASSERT_GT(stats.GetTableStats().GetByKeyFilterSize(), 0); + } -} // namespace NKikimr + WaitTxNotification(server, sender, + AsyncSetEnableFilterByKey(server, "/Root", "table-1", false)); + { + auto stats = WaitTableStats(runtime, shard1, HasSchemaChangesCondition()); + UNIT_ASSERT_GT(stats.GetTableStats().GetByKeyFilterSize(), 0); + } + CompactTable(runtime, shard1, tableId1, false); + { + auto stats = WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetByKeyFilterSize(), 0); + } + } + + Y_UNIT_TEST(HasSchemaChanges_Columns) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + auto [shards, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1); + ui64 shard1 = shards.at(0); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)"); + + CompactTable(runtime, shard1, tableId1, false); + + { + auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1)); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetHasSchemaChanges(), false); + } + + WaitTxNotification(server, sender, + AsyncAlterAddExtraColumn(server, "/Root", "table-1")); + WaitTableStats(runtime, shard1, HasSchemaChangesCondition()); + CompactTable(runtime, shard1, tableId1, false); + WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + + WaitTxNotification(server, sender, + AsyncAlterDropColumn(server, "/Root", "table-1", "extra")); + WaitTableStats(runtime, shard1, HasSchemaChangesCondition()); + CompactTable(runtime, shard1, tableId1, false); + WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + } + + Y_UNIT_TEST(HasSchemaChanges_Families) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .AddStoragePool("ssd") + .AddStoragePool("hdd"); + + TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.GetAppData().FeatureFlags.SetEnableLocalDBBtreeIndex(false); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}, {"value2", "Uint32", false, false}}) + .Families({{.Name = "default", .LogPoolKind = "ssd", .SysLogPoolKind = "ssd", .DataPoolKind = "ssd"}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)"); + + WaitTxNotification(server, sender, + AsyncAlterAddExtraColumn(server, "/Root", "table-1")); + + CompactTable(runtime, shard1, tableId1, false); + { + auto stats = WaitTableStats(runtime, shard1, HasPartCountCondition(1)); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetHasSchemaChanges(), false); + } + + WaitTxNotification(server, sender, + AsyncSetColumnFamily(server, "/Root", "table-1", "value2", {.Name = "hdd", .DataPoolKind = "hdd"})); + WaitTableStats(runtime, shard1, HasSchemaChangesCondition()); + CompactTable(runtime, shard1, tableId1, false); + WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + + WaitTxNotification(server, sender, + AsyncSetColumnFamily(server, "/Root", "table-1", "extra", {.Name = "hdd", .DataPoolKind = "hdd"})); + WaitTableStats(runtime, shard1, HasSchemaChangesCondition()); + CompactTable(runtime, shard1, tableId1, false); + WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + + WaitTxNotification(server, sender, + AsyncSetColumnFamily(server, "/Root", "table-1", "extra", {.Name = "default", .DataPoolKind = "ssd"})); + WaitTableStats(runtime, shard1, HasSchemaChangesCondition()); + CompactTable(runtime, shard1, tableId1, false); + WaitTableStats(runtime, shard1, DoesNotHaveSchemaChangesCondition()); + } + +} + +} diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index 17cdf0d6eb..b840ed0752 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -1657,6 +1657,47 @@ ui64 AsyncAlterDropColumn( return RunSchemeTx(*server->GetRuntime(), std::move(request)); } +ui64 AsyncSetEnableFilterByKey( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& name, + bool value) +{ + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterTable(); + desc.SetName(name); + desc.MutablePartitionConfig()->SetEnableFilterByKey(value); + + return RunSchemeTx(*server->GetRuntime(), std::move(request)); +} + +ui64 AsyncSetColumnFamily( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& name, + const TString& colName, + TShardedTableOptions::TFamily family) +{ + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterTable(); + desc.SetName(name); + + auto col = desc.AddColumns(); + col->SetName(colName); + col->SetFamilyName(family.Name); + + auto fam = desc.MutablePartitionConfig()->AddColumnFamilies(); + if (family.Name) fam->SetName(family.Name); + if (family.LogPoolKind) fam->MutableStorageConfig()->MutableLog()->SetPreferredPoolKind(family.LogPoolKind); + if (family.SysLogPoolKind) fam->MutableStorageConfig()->MutableSysLog()->SetPreferredPoolKind(family.SysLogPoolKind); + if (family.DataPoolKind) fam->MutableStorageConfig()->MutableData()->SetPreferredPoolKind(family.DataPoolKind); + if (family.ExternalPoolKind) fam->MutableStorageConfig()->MutableExternal()->SetPreferredPoolKind(family.ExternalPoolKind); + if (family.DataThreshold) fam->MutableStorageConfig()->SetDataThreshold(family.DataThreshold); + if (family.ExternalThreshold) fam->MutableStorageConfig()->SetExternalThreshold(family.ExternalThreshold); + + return RunSchemeTx(*server->GetRuntime(), std::move(request)); +} + ui64 AsyncAlterAndDisableShadow( Tests::TServer::TPtr server, const TString& workingDir, @@ -1903,7 +1944,9 @@ void WaitTableStatsImpl(TTestActorRuntime& runtime, UNIT_ASSERT(captured); } -NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 minRowReads, ui64 minRangeReadRows) { +NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRuntime& runtime, ui64 datashardId, + std::function<bool(const NKikimrTableStats::TTableStats& stats)> condition) +{ NKikimrTxDataShard::TEvPeriodicTableStats stats; bool captured = false; @@ -1914,10 +1957,10 @@ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRunti if (!record.GetFollowerId()) return; - if (record.GetDatashardId() != tabletId) + if (record.GetDatashardId() != datashardId) return; - if (record.GetTableStats().GetRowReads() < minRowReads || record.GetTableStats().GetRangeReadRows() < minRangeReadRows) + if (!condition(record.GetTableStats())) return; stats = record; @@ -1929,7 +1972,9 @@ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRunti } -NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 minPartCount, ui64 minRows) { +NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, ui64 datashardId, + std::function<bool(const NKikimrTableStats::TTableStats& stats)> condition) +{ NKikimrTxDataShard::TEvPeriodicTableStats stats; bool captured = false; @@ -1940,10 +1985,10 @@ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runt if (record.GetFollowerId()) return; - if (record.GetDatashardId() != tabletId) + if (record.GetDatashardId() != datashardId) return; - if (record.GetTableStats().GetPartCount() < minPartCount || record.GetTableStats().GetRowCount() < minRows) + if (!condition(record.GetTableStats())) return; stats = record; diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index 4aa4f18a01..fde8d57384 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -636,6 +636,19 @@ ui64 AsyncAlterDropColumn( const TString& name, const TString& colName); +ui64 AsyncSetEnableFilterByKey( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& name, + bool value); + +ui64 AsyncSetColumnFamily( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& name, + const TString& colName, + TShardedTableOptions::TFamily family); + ui64 AsyncAlterAndDisableShadow( Tests::TServer::TPtr server, const TString& workingDir, @@ -724,8 +737,10 @@ TString ReadShardedTable( void WaitTxNotification(Tests::TServer::TPtr server, TActorId sender, ui64 txId); void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId); -NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 minPartCount = 0, ui64 minRows = 0); -NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 minRowReads = 0, ui64 minRangeReadRows = 0); +NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, ui64 datashardId, + std::function<bool(const NKikimrTableStats::TTableStats& stats)> condition = [](const NKikimrTableStats::TTableStats&)->bool{return true;}); +NKikimrTxDataShard::TEvPeriodicTableStats WaitTableFollowerStats(TTestActorRuntime& runtime, ui64 datashardId, + std::function<bool(const NKikimrTableStats::TTableStats& stats)> condition = [](const NKikimrTableStats::TTableStats&)->bool{return true;}); void SimulateSleep(Tests::TServer::TPtr server, TDuration duration); void SimulateSleep(TTestActorRuntime& runtime, TDuration duration); diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h index 5dd37e7cab..c17056ff8e 100644 --- a/ydb/core/tx/schemeshard/operation_queue_timer.h +++ b/ydb/core/tx/schemeshard/operation_queue_timer.h @@ -113,8 +113,8 @@ struct TShardCompactionInfo { ui64 LastFullCompactionTs = 0; ui64 RowCount = 0; ui64 RowDeletes = 0; - ui64 PartCount = 0; + bool HasSchemaChanges = false; explicit TShardCompactionInfo(const TShardIdx& id) : ShardIdx(id) @@ -127,6 +127,7 @@ struct TShardCompactionInfo { , RowCount(stats.RowCount) , RowDeletes(stats.RowDeletes) , PartCount(stats.PartCount) + , HasSchemaChanges(stats.HasSchemaChanges) {} TShardCompactionInfo(const TShardCompactionInfo&) = default; @@ -262,7 +263,8 @@ public: // ignore single parted shard if needed bool isSingleParted = info.PartCount == 1; - if (!Config.CompactSinglePartedShards && isSingleParted) + bool hasSchemaChanges = info.HasSchemaChanges; + if (!Config.CompactSinglePartedShards && isSingleParted && !hasSchemaChanges) return false; if (info.RowCount < Config.RowCountThreshold) diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index e3cc67d2b6..ef02421c39 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -212,6 +212,7 @@ TPartitionStats TTxStoreTableStats::PrepareStats(const TActorContext& ctx, newStats.FullCompactionTs = tableStats.GetLastFullCompactionTs(); newStats.MemDataSize = tableStats.GetInMemSize(); newStats.StartTime = TInstant::MilliSeconds(rec.GetStartTime()); + newStats.HasSchemaChanges = tableStats.GetHasSchemaChanges(); newStats.HasLoanedData = tableStats.GetHasLoanedParts(); for (ui64 tabletId : rec.GetUserTablePartOwners()) { newStats.PartOwners.insert(TTabletId(tabletId)); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index d3fdc89e67..8a04416a2b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -278,6 +278,8 @@ struct TPartitionStats { // True when lent parts to other tablets bool HasLoanedData = false; + bool HasSchemaChanges = false; + // Tablet actor started at TInstant StartTime; diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 7ca17066cb..b11c249bf2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -42,6 +42,7 @@ static void FillTableStats(NKikimrTableStats::TTableStats* stats, const TPartiti stats->SetRangeReadRows(tableStats.RangeReadRows); stats->SetPartCount(tableStats.PartCount); + stats->SetHasSchemaChanges(tableStats.HasSchemaChanges); auto* storagePoolsStats = stats->MutableStoragePools()->MutablePoolsUsage(); for (const auto& [poolKind, stats] : tableStats.StoragePoolsStats) { diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 0e8f3b31ea..efcf6a89e5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -377,7 +377,7 @@ struct Schema : NIceDb::Schema { struct FullCompactionTs : Column<31, NScheme::NTypeIds::Uint64> { static constexpr ui64 Default = 0; }; struct MemDataSize : Column<32, NScheme::NTypeIds::Uint64> { static constexpr ui64 Default = 0; }; - // PartCount, PartOwners & ShardState are volatile data + // PartCount, PartOwners, ShardState, HasSchemaChanges are volatile data // Represented by NKikimrTableStats::TStoragePoolsStats. struct StoragePoolsStats : Column<33, NScheme::NTypeIds::String> { using Type = TString; }; diff --git a/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp index 8297cc3b6f..bd90a89af2 100644 --- a/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp +++ b/ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp @@ -659,7 +659,9 @@ Y_UNIT_TEST_SUITE(TStoragePoolsStatsPersistence) { NKikimrTxDataShard::TEvCompactTableResult::OK ); // we wait for at least 1 part count, because it signals that the stats have been recalculated after compaction - WaitTableStats(runtime, datashard, 1, rowsCount).GetTableStats(); + WaitTableStats(runtime, datashard, [](const NKikimrTableStats::TTableStats& stats) { + return stats.GetPartCount() >= 1; + }); auto checkUsage = [&poolsKinds](ui64 totalUsage, const auto& poolUsage) { if (IsIn(poolsKinds, poolUsage.GetPoolKind())) { diff --git a/ydb/services/ydb/ydb_ut.cpp b/ydb/services/ydb/ydb_ut.cpp index 8dc54a48b4..c8ecc4e180 100644 --- a/ydb/services/ydb/ydb_ut.cpp +++ b/ydb/services/ydb/ydb_ut.cpp @@ -5700,7 +5700,9 @@ Y_UNIT_TEST(DisableWritesToDatabase) { // try upsert when the feature flag is enabled { runtime.GetAppData().FeatureFlags.SetEnableSeparateDiskSpaceQuotas(true); - WaitTableStats(runtime, datashard, 1); + WaitTableStats(runtime, datashard, [](const NKikimrTableStats::TTableStats& stats) { + return stats.GetPartCount() >= 1; + }); upsert(table, "2u, \"Bar\"", Ydb::StatusIds::UNAVAILABLE); checkDatabaseState(tenantPath, true); } @@ -5708,7 +5710,9 @@ Y_UNIT_TEST(DisableWritesToDatabase) { // try upsert when the feature flag is disabled { runtime.GetAppData().FeatureFlags.SetEnableSeparateDiskSpaceQuotas(false); - WaitTableStats(runtime, datashard, 1); + WaitTableStats(runtime, datashard, [](const NKikimrTableStats::TTableStats& stats) { + return stats.GetPartCount() >= 1; + }); upsert(table, "2u, \"Bar\""); checkDatabaseState(tenantPath, false); } |