aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-10-31 10:09:01 +0300
committerGitHub <noreply@github.com>2024-10-31 10:09:01 +0300
commit21fa904d5864271db615aa2ea5fa53419beb32cb (patch)
tree8a0a98a1b28207190212f22d0ccf5ff681d39d1a
parentb9d09c35e7ac68cc901fd377249a1d3e091b60a5 (diff)
downloadydb-21fa904d5864271db615aa2ea5fa53419beb32cb.tar.gz
actualize local db for columnshards (#11115)
-rw-r--r--.github/config/muted_ya.txt1
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h67
-rw-r--r--ydb/core/tx/columnshard/common/blob.h8
-rw-r--r--ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp146
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h4
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.h4
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor.cpp23
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor.h3
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor_meta.h7
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.h6
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp34
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h26
-rw-r--r--ydb/core/tx/columnshard/engines/protos/portion_info.proto7
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule/granule.cpp42
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule/granule.h18
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp26
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/abstract.h4
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp2
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp184
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h43
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/clean.h4
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp2
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/portion.h4
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp147
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h43
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/ya.make2
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp85
33 files changed, 703 insertions, 282 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt
index 3d8ceb6df33..5ae37139dfe 100644
--- a/.github/config/muted_ya.txt
+++ b/.github/config/muted_ya.txt
@@ -45,7 +45,6 @@ ydb/core/kqp/ut/sysview KqpSystemView.PartitionStatsFollower
ydb/core/mind/hive/ut THiveTest.DrainWithHiveRestart
ydb/core/persqueue/ut [*/*] chunk chunk
ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
-ydb/core/tx/columnshard/ut_rw Normalizers.CleanEmptyPortionsNormalizer
ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithData
ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithDataAndPersistentPartitionStats
ydb/core/tx/schemeshard/ut_pq_reboots TPqGroupTestReboots.AlterWithReboots-PQConfigTransactionsAtSchemeShard-false
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 37307761375..4c2e9e9c9fb 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -901,27 +901,55 @@ struct Schema : NIceDb::Schema {
}
namespace NKikimr::NOlap {
+class TPortionLoadContext {
+private:
+ YDB_READONLY(ui64, PathId, 0);
+ YDB_READONLY(ui64, PortionId, 0);
+ YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexPortionMeta, MetaProto);
+
+public:
+ template <class TSource>
+ TPortionLoadContext(const TSource& rowset) {
+ PathId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PathId>();
+ PortionId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PortionId>();
+ const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>();
+ AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
+ }
+};
+
class TColumnChunkLoadContext {
private:
YDB_READONLY_DEF(TBlobRange, BlobRange);
TChunkAddress Address;
+ YDB_READONLY(ui64, PathId, 0);
+ YDB_READONLY(ui64, PortionId, 0);
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);
+ YDB_READONLY(TSnapshot, RemoveSnapshot, TSnapshot::Zero());
+ YDB_READONLY(TSnapshot, MinSnapshotDeprecated, TSnapshot::Zero());
+
public:
const TChunkAddress& GetAddress() const {
return Address;
}
- TColumnChunkLoadContext(const TChunkAddress& address, const TBlobRange& bRange, const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
+ TColumnChunkLoadContext(const ui64 pathId, const ui64 portionId, const TChunkAddress& address, const TBlobRange& bRange,
+ const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
: BlobRange(bRange)
, Address(address)
- , MetaProto(metaProto)
- {
-
+ , PathId(pathId)
+ , PortionId(portionId)
+ , MetaProto(metaProto) {
}
template <class TSource>
TColumnChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
- : Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(), rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>()) {
+ : Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(),
+ rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>())
+ , RemoveSnapshot(rowset.template GetValue<NColumnShard::Schema::IndexColumns::XPlanStep>(),
+ rowset.template GetValue<NColumnShard::Schema::IndexColumns::XTxId>())
+ , MinSnapshotDeprecated(rowset.template GetValue<NColumnShard::Schema::IndexColumns::PlanStep>(),
+ rowset.template GetValue<NColumnShard::Schema::IndexColumns::TxId>())
+ {
AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Blob>();
Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size());
@@ -929,29 +957,38 @@ public:
BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId);
BlobRange.Offset = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Offset>();
BlobRange.Size = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Size>();
+ PathId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::PathId>();
+ PortionId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Portion>();
AFL_VERIFY(BlobRange.BlobId.IsValid() && BlobRange.Size)("event", "incorrect blob")("blob", BlobRange.ToString());
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Metadata>();
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
}
-
- const NKikimrTxColumnShard::TIndexPortionMeta* GetPortionMeta() const {
- if (MetaProto.HasPortionMeta()) {
- return &MetaProto.GetPortionMeta();
- } else {
- return nullptr;
- }
- }
};
class TIndexChunkLoadContext {
private:
YDB_READONLY_DEF(std::optional<TBlobRange>, BlobRange);
YDB_READONLY_DEF(std::optional<TString>, BlobData);
+ YDB_READONLY(ui64, PathId, 0);
+ YDB_READONLY(ui64, PortionId, 0);
TChunkAddress Address;
const ui32 RecordsCount;
const ui32 RawBytes;
public:
+ ui32 GetRawBytes() const {
+ return RawBytes;
+ }
+
+ ui32 GetDataSize() const {
+ if (BlobRange) {
+ return BlobRange->GetSize();
+ } else {
+ AFL_VERIFY(!!BlobData);
+ return BlobData->size();
+ }
+ }
+
TIndexChunk BuildIndexChunk(const TBlobRangeLink16::TLinkId blobLinkId) const {
AFL_VERIFY(BlobRange);
return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, RawBytes, BlobRange->BuildLink(blobLinkId));
@@ -964,7 +1001,9 @@ public:
template <class TSource>
TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
- : Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
+ : PathId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PathId>())
+ , PortionId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PortionId>())
+ , Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
, RecordsCount(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RecordsCount>())
, RawBytes(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RawBytes>())
{
diff --git a/ydb/core/tx/columnshard/common/blob.h b/ydb/core/tx/columnshard/common/blob.h
index e1c10a46d40..b7e3f0fa5a7 100644
--- a/ydb/core/tx/columnshard/common/blob.h
+++ b/ydb/core/tx/columnshard/common/blob.h
@@ -189,6 +189,14 @@ struct TBlobRange {
ui32 Offset;
ui32 Size;
+ ui32 GetSize() const {
+ return Size;
+ }
+
+ ui32 GetOffset() const {
+ return Offset;
+ }
+
TString GetData(const TString& blobData) const;
bool operator<(const TBlobRange& br) const {
diff --git a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp
index f5c35e58943..cb960b38c49 100644
--- a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp
+++ b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp
@@ -20,7 +20,7 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
for (auto&& portion : i.second.DetachPortions()) {
portion.MutablePortionInfo().SetPathId(it->second);
- index.AppendPortion(portion.GetPortionInfo());
+ index.AppendPortion(portion.MutablePortionInfoPtr());
}
}
return TConclusionStatus::Success();
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index bb99f3d4c68..46c39964da5 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -111,7 +111,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
context.EngineLogs.AddCleanupPortion(i);
}
for (auto& portionBuilder : AppendedPortions) {
- context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult().GetPortionInfo());
+ context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult().MutablePortionInfoPtr());
}
}
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index e4f383e765b..dc1665cf615 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -2,25 +2,25 @@
#include "filter.h"
#include "changes/actualization/construction/context.h"
-#include "changes/indexation.h"
-#include "changes/general_compaction.h"
#include "changes/cleanup_portions.h"
#include "changes/cleanup_tables.h"
+#include "changes/general_compaction.h"
+#include "changes/indexation.h"
#include "changes/ttl.h"
#include "portions/constructor.h"
#include <ydb/core/base/appdata.h>
-#include <ydb/core/tx/columnshard/common/limits.h>
-#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
-#include <ydb/core/tx/columnshard/columnshard_ttl.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
+#include <ydb/core/tx/columnshard/columnshard_ttl.h>
+#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/tiering/manager.h>
+#include <ydb/library/actors/core/monotonic_provider.h>
#include <ydb/library/conclusion/status.h>
#include <library/cpp/time_provider/time_provider.h>
-#include <ydb/library/actors/core/monotonic_provider.h>
#include <concepts>
@@ -32,14 +32,13 @@ TColumnEngineForLogs::TColumnEngineForLogs(
, StoragesManager(storagesManager)
, TabletId(tabletId)
, LastPortion(0)
- , LastGranule(0)
-{
+ , LastGranule(0) {
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, schema);
}
-TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager,
- const TSnapshot& snapshot, TIndexInfo&& schema)
+TColumnEngineForLogs::TColumnEngineForLogs(
+ ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, storagesManager))
, StoragesManager(storagesManager)
, TabletId(tabletId)
@@ -58,13 +57,14 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() {
return Counters;
}
-void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType,
- const TPortionInfo* exPortionInfo) {
+void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType, const TPortionInfo* exPortionInfo) {
if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD)) {
auto before = Counters.Active();
UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo);
auto after = Counters.Active();
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_stats_updated")("type", updateType)("path_id", portionInfo.GetPathId())("portion", portionInfo.GetPortionId())("before_size", before.Bytes)("after_size", after.Bytes)("before_rows", before.Rows)("after_rows", after.Rows);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_stats_updated")("type", updateType)("path_id", portionInfo.GetPathId())(
+ "portion", portionInfo.GetPortionId())("before_size", before.Bytes)("after_size", after.Bytes)("before_rows", before.Rows)(
+ "after_rows", after.Rows);
} else {
UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo);
}
@@ -89,31 +89,28 @@ TColumnEngineStats::TPortionsStats DeltaStats(const TPortionInfo& portionInfo) {
return deltaStats;
}
-void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo,
- EStatsUpdateType updateType,
- const TPortionInfo* exPortionInfo) const {
+void TColumnEngineForLogs::UpdatePortionStats(
+ TColumnEngineStats& engineStats, const TPortionInfo& portionInfo, EStatsUpdateType updateType, const TPortionInfo* exPortionInfo) const {
TColumnEngineStats::TPortionsStats deltaStats = DeltaStats(portionInfo);
Y_ABORT_UNLESS(!exPortionInfo || exPortionInfo->GetMeta().Produced != TPortionMeta::EProduced::UNSPECIFIED);
Y_ABORT_UNLESS(portionInfo.GetMeta().Produced != TPortionMeta::EProduced::UNSPECIFIED);
- TColumnEngineStats::TPortionsStats& srcStats = exPortionInfo
- ? (exPortionInfo->HasRemoveSnapshot()
- ? engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE]
- : engineStats.StatsByType[exPortionInfo->GetMeta().Produced])
- : engineStats.StatsByType[portionInfo.GetMeta().Produced];
- TColumnEngineStats::TPortionsStats& stats = portionInfo.HasRemoveSnapshot()
- ? engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE]
- : engineStats.StatsByType[portionInfo.GetMeta().Produced];
+ TColumnEngineStats::TPortionsStats& srcStats =
+ exPortionInfo ? (exPortionInfo->HasRemoveSnapshot() ? engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE]
+ : engineStats.StatsByType[exPortionInfo->GetMeta().Produced])
+ : engineStats.StatsByType[portionInfo.GetMeta().Produced];
+ TColumnEngineStats::TPortionsStats& stats = portionInfo.HasRemoveSnapshot() ? engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE]
+ : engineStats.StatsByType[portionInfo.GetMeta().Produced];
const bool isErase = updateType == EStatsUpdateType::ERASE;
const bool isAdd = updateType == EStatsUpdateType::ADD;
- if (isErase) { // PortionsToDrop
+ if (isErase) { // PortionsToDrop
stats -= deltaStats;
- } else if (isAdd) { // Load || AppendedPortions
+ } else if (isAdd) { // Load || AppendedPortions
stats += deltaStats;
- } else if (&srcStats != &stats || exPortionInfo) { // SwitchedPortions || PortionsToEvict
+ } else if (&srcStats != &stats || exPortionInfo) { // SwitchedPortions || PortionsToEvict
stats += deltaStats;
if (exPortionInfo) {
@@ -206,10 +203,10 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
NColumnShard::TLoadTimeSignals::TLoadTimer timer = SignalCounters.PortionsLoadingTimeCounters.StartGuard();
TMemoryProfileGuard g("TTxInit/LoadColumns/Portions");
if (!db.LoadPortions([&](TPortionInfoConstructor&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) {
- const TIndexInfo& indexInfo = portion.GetSchema(VersionedIndex)->GetIndexInfo();
- AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo));
- AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion)));
- })) {
+ const TIndexInfo& indexInfo = portion.GetSchema(VersionedIndex)->GetIndexInfo();
+ AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo));
+ AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion)));
+ })) {
timer.AddLoadingFail();
return false;
}
@@ -219,11 +216,10 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
NColumnShard::TLoadTimeSignals::TLoadTimer timer = SignalCounters.ColumnsLoadingTimeCounters.StartGuard();
TMemoryProfileGuard g("TTxInit/LoadColumns/Records");
TPortionInfo::TSchemaCursor schema(VersionedIndex);
- if (!db.LoadColumns([&](TPortionInfoConstructor&& portion, const TColumnChunkLoadContext& loadContext) {
- auto currentSchema = schema.GetSchema(portion);
- auto* constructor = constructors.MergeConstructor(std::move(portion));
- constructor->LoadRecord(currentSchema->GetIndexInfo(), loadContext);
- })) {
+ if (!db.LoadColumns([&](const TColumnChunkLoadContext& loadContext) {
+ auto* constructor = constructors.GetConstructorVerified(loadContext.GetPathId(), loadContext.GetPortionId());
+ constructor->LoadRecord(loadContext);
+ })) {
timer.AddLoadingFail();
return false;
}
@@ -231,11 +227,11 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
{
NColumnShard::TLoadTimeSignals::TLoadTimeSignals::TLoadTimer timer = SignalCounters.IndexesLoadingTimeCounters.StartGuard();
- TMemoryProfileGuard g("TTxInit/LoadColumns/Indexes");
+ TMemoryProfileGuard g("TTxInit/LoadIndexes/Indexes");
if (!db.LoadIndexes([&](const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext& loadContext) {
- auto* constructor = constructors.GetConstructorVerified(pathId, portionId);
- constructor->LoadIndex(loadContext);
- })) {
+ auto* constructor = constructors.GetConstructorVerified(pathId, portionId);
+ constructor->LoadIndex(loadContext);
+ })) {
timer.AddLoadingFail();
return false;
};
@@ -246,8 +242,7 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
for (auto&& [granuleId, pathConstructors] : constructors) {
auto g = GetGranulePtrVerified(granuleId);
for (auto&& [portionId, constructor] : pathConstructors) {
- auto portion = *constructor.Build(false).GetPortionInfoPtr();
- g->UpsertPortionOnLoad(std::move(portion));
+ g->UpsertPortionOnLoad(constructor.Build(false).MutablePortionInfoPtr());
}
}
}
@@ -263,18 +258,18 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
auto callback = [&](ui32 id, ui64 value) {
switch (id) {
- case LAST_PORTION:
- LastPortion = value;
- break;
- case LAST_GRANULE:
- LastGranule = value;
- break;
- case LAST_PLAN_STEP:
- LastSnapshot = TSnapshot(value, LastSnapshot.GetTxId());
- break;
- case LAST_TX_ID:
- LastSnapshot = TSnapshot(LastSnapshot.GetPlanStep(), value);
- break;
+ case LAST_PORTION:
+ LastPortion = value;
+ break;
+ case LAST_GRANULE:
+ LastGranule = value;
+ break;
+ case LAST_PLAN_STEP:
+ LastSnapshot = TSnapshot(value, LastSnapshot.GetTxId());
+ break;
+ case LAST_TX_ID:
+ LastSnapshot = TSnapshot(LastSnapshot.GetPlanStep(), value);
+ break;
}
};
@@ -297,7 +292,6 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st
if (!data.GetRemove()) {
AFL_VERIFY(changes->PathToGranule.emplace(pathId, GetGranulePtrVerified(pathId)->GetBucketPositions()).second);
}
-
}
return changes;
@@ -313,7 +307,8 @@ ui64 TColumnEngineForLogs::GetCompactionPriority(const std::shared_ptr<NDataLock
}
}
-std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept {
+std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(
+ const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept {
AFL_VERIFY(dataLocksManager);
auto granule = GranulesStorage->GetGranuleForCompaction(dataLocksManager);
if (!granule) {
@@ -323,7 +318,8 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(cons
granule->OnStartCompaction();
auto changes = granule->GetOptimizationTask(granule, dataLocksManager);
if (!changes) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot build optimization task for granule that need compaction")("weight", granule->GetCompactionPriority().DebugString());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot build optimization task for granule that need compaction")(
+ "weight", granule->GetCompactionPriority().DebugString());
}
return changes;
}
@@ -351,8 +347,8 @@ std::shared_ptr<TCleanupTablesColumnEngineChanges> TColumnEngineForLogs::StartCl
return changes;
}
-std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::StartCleanupPortions(const TSnapshot& snapshot,
- const THashSet<ui64>& pathsToDrop, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept {
+std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::StartCleanupPortions(
+ const TSnapshot& snapshot, const THashSet<ui64>& pathsToDrop, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept {
AFL_VERIFY(dataLocksManager);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size());
auto changes = std::make_shared<TCleanupPortionsColumnEngineChanges>(StoragesManager);
@@ -423,8 +419,8 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
++it;
}
}
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")
- ("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size())("drop", portionsFromDrop)("skip", skipLocked);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())(
+ "portions_prepared", changes->PortionsToDrop.size())("drop", portionsFromDrop)("skip", skipLocked);
if (changes->PortionsToDrop.empty()) {
return nullptr;
@@ -433,8 +429,8 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
return changes;
}
-std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager,
- const ui64 memoryUsageLimit) noexcept {
+std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction,
+ const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept {
AFL_VERIFY(dataLocksManager);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size());
@@ -480,7 +476,8 @@ bool TColumnEngineForLogs::ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineC
return true;
}
-bool TColumnEngineForLogs::ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> /*indexChanges*/, const TSnapshot& snapshot) noexcept {
+bool TColumnEngineForLogs::ApplyChangesOnExecute(
+ IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> /*indexChanges*/, const TSnapshot& snapshot) noexcept {
db.WriteCounter(LAST_PORTION, LastPortion);
db.WriteCounter(LAST_GRANULE, LastGranule);
@@ -492,11 +489,11 @@ bool TColumnEngineForLogs::ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr
return true;
}
-void TColumnEngineForLogs::AppendPortion(const TPortionInfo& portionInfo) {
- auto granule = GetGranulePtrVerified(portionInfo.GetPathId());
- AFL_VERIFY(!granule->GetPortionOptional(portionInfo.GetPortionId()));
- UpdatePortionStats(portionInfo, EStatsUpdateType::ADD);
- granule->UpsertPortion(portionInfo);
+void TColumnEngineForLogs::AppendPortion(const TPortionInfo::TPtr& portionInfo) {
+ auto granule = GetGranulePtrVerified(portionInfo->GetPathId());
+ AFL_VERIFY(!granule->GetPortionOptional(portionInfo->GetPortionId()));
+ UpdatePortionStats(*portionInfo, EStatsUpdateType::ADD);
+ granule->AppendPortion(portionInfo);
}
bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool updateStats) {
@@ -553,7 +550,8 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(
return out;
}
-void TColumnEngineForLogs::OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) {
+void TColumnEngineForLogs::OnTieringModified(
+ const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) {
if (!ActualizationStarted) {
for (auto&& i : GranulesStorage->GetTables()) {
i.second->StartActualizationIndex();
@@ -565,12 +563,10 @@ void TColumnEngineForLogs::OnTieringModified(const std::shared_ptr<NColumnShard:
THashMap<ui64, TTiering> tierings = manager->GetTiering();
ttl.AddTtls(tierings);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified")
- ("new_count_tierings", tierings.size())
- ("new_count_ttls", ttl.PathsCount());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified")("new_count_tierings", tierings.size())(
+ "new_count_ttls", ttl.PathsCount());
// some string
-
if (pathId) {
auto g = GetGranulePtrVerified(*pathId);
auto it = tierings.find(*pathId);
@@ -599,4 +595,4 @@ void TColumnEngineForLogs::DoRegisterTable(const ui64 pathId) {
}
}
-} // namespace NKikimr::NOlap
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 6173bd43834..aa0f41a7cde 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -192,14 +192,14 @@ public:
template <class TModifier>
void ModifyPortionOnComplete(const TPortionInfo::TConstPtr& portion, const TModifier& modifier) {
- auto exPortion = *portion;
+ auto exPortion = portion->MakeCopy();
AFL_VERIFY(portion);
auto granule = GetGranulePtrVerified(portion->GetPathId());
granule->ModifyPortionOnComplete(portion, modifier);
UpdatePortionStats(*portion, EStatsUpdateType::DEFAULT, &exPortion);
}
- void AppendPortion(const TPortionInfo& portionInfo);
+ void AppendPortion(const TPortionInfo::TPtr& portionInfo);
private:
TVersionedIndex VersionedIndex;
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
index 11ad657cd6b..fc63341d9d4 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
@@ -98,7 +98,7 @@ void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRe
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortionId(), row.Chunk).Delete();
}
-bool TDbWrapper::LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) {
+bool TDbWrapper::LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) {
NIceDb::TNiceDb db(Database);
using IndexColumns = NColumnShard::Schema::IndexColumns;
auto rowset = db.Table<IndexColumns>().Prefix(0).Select();
@@ -107,15 +107,8 @@ bool TDbWrapper::LoadColumns(const std::function<void(NOlap::TPortionInfoConstru
}
while (!rowset.EndOfSet()) {
- NOlap::TSnapshot minSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>());
- NOlap::TSnapshot removeSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>());
-
- NOlap::TPortionInfoConstructor constructor(rowset.GetValue<IndexColumns::PathId>(), rowset.GetValue<IndexColumns::Portion>());
- constructor.SetMinSnapshotDeprecated(minSnapshot);
- constructor.SetRemoveSnapshot(removeSnapshot);
-
NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, DsGroupSelector);
- callback(std::move(constructor), chunkLoadContext);
+ callback(chunkLoadContext);
if (!rowset.Next()) {
return false;
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h
index 50958b6fca2..303dc75efd2 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.h
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.h
@@ -41,7 +41,7 @@ public:
virtual void WriteColumn(const TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) = 0;
virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0;
- virtual bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) = 0;
+ virtual bool LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) = 0;
virtual void WritePortion(const NOlap::TPortionInfo& portion) = 0;
virtual void ErasePortion(const NOlap::TPortionInfo& portion) = 0;
@@ -78,7 +78,7 @@ public:
void WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) override;
void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
- bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) override;
+ bool LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) override;
virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor.cpp b/ydb/core/tx/columnshard/engines/portions/constructor.cpp
index 273edd597c9..188a415a536 100644
--- a/ydb/core/tx/columnshard/engines/portions/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/constructor.cpp
@@ -12,6 +12,22 @@ namespace NKikimr::NOlap {
TPortionDataAccessor TPortionInfoConstructor::Build(const bool needChunksNormalization) {
AFL_VERIFY(!Constructed);
Constructed = true;
+
+ MetaConstructor.ColumnRawBytes = 0;
+ MetaConstructor.ColumnBlobBytes = 0;
+ MetaConstructor.IndexRawBytes = 0;
+ MetaConstructor.IndexBlobBytes = 0;
+
+ MetaConstructor.RecordsCount = GetRecordsCount();
+ for (auto&& r : Records) {
+ *MetaConstructor.ColumnRawBytes += r.GetMeta().GetRawBytes();
+ *MetaConstructor.ColumnBlobBytes += r.GetBlobRange().GetSize();
+ }
+ for (auto&& r : Indexes) {
+ *MetaConstructor.IndexRawBytes += r.GetRawBytes();
+ *MetaConstructor.IndexBlobBytes += r.GetDataSize();
+ }
+
std::shared_ptr<TPortionInfo> result(new TPortionInfo(MetaConstructor.Build()));
AFL_VERIFY(PathId);
result->PathId = PathId;
@@ -97,7 +113,6 @@ TPortionDataAccessor TPortionInfoConstructor::Build(const bool needChunksNormali
result->Records.shrink_to_fit();
result->BlobIds = std::move(BlobIds);
result->BlobIds.shrink_to_fit();
- result->Precalculate();
return TPortionDataAccessor(result);
}
@@ -111,13 +126,9 @@ ISnapshotSchema::TPtr TPortionInfoConstructor::GetSchema(const TVersionedIndex&
return index.GetSchema(*MinSnapshotDeprecated);
}
-void TPortionInfoConstructor::LoadRecord(const TIndexInfo& indexInfo, const TColumnChunkLoadContext& loadContext) {
+void TPortionInfoConstructor::LoadRecord(const TColumnChunkLoadContext& loadContext) {
TColumnRecord rec(RegisterBlobId(loadContext.GetBlobRange().GetBlobId()), loadContext);
Records.push_back(std::move(rec));
-
- if (loadContext.GetPortionMeta()) {
- AFL_VERIFY(MetaConstructor.LoadMetadata(*loadContext.GetPortionMeta(), indexInfo));
- }
}
void TPortionInfoConstructor::LoadIndex(const TIndexChunkLoadContext& loadContext) {
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor.h b/ydb/core/tx/columnshard/engines/portions/constructor.h
index e42e92c190d..90e44218c1d 100644
--- a/ydb/core/tx/columnshard/engines/portions/constructor.h
+++ b/ydb/core/tx/columnshard/engines/portions/constructor.h
@@ -276,9 +276,10 @@ public:
SetRemoveSnapshot(TSnapshot(planStep, txId));
}
- void LoadRecord(const TIndexInfo& indexInfo, const TColumnChunkLoadContext& loadContext);
+ void LoadRecord(const TColumnChunkLoadContext& loadContext);
ui32 GetRecordsCount() const {
+ AFL_VERIFY(Records.size());
ui32 result = 0;
std::optional<ui32> columnIdFirst;
for (auto&& i : Records) {
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp b/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp
index 948a199c960..dbb22213e48 100644
--- a/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp
@@ -42,16 +42,20 @@ TPortionMeta TPortionMetaConstructor::Build() {
AFL_VERIFY(FirstAndLastPK);
AFL_VERIFY(RecordSnapshotMin);
AFL_VERIFY(RecordSnapshotMax);
- AFL_VERIFY(CompactionLevel);
TPortionMeta result(*FirstAndLastPK, *RecordSnapshotMin, *RecordSnapshotMax);
if (TierName) {
result.TierName = *TierName;
}
- result.CompactionLevel = *CompactionLevel;
- AFL_VERIFY(DeletionsCount);
- result.DeletionsCount = *DeletionsCount;
- AFL_VERIFY(Produced);
- result.Produced = *Produced;
+ result.CompactionLevel = *TValidator::CheckNotNull(CompactionLevel);
+ result.DeletionsCount = *TValidator::CheckNotNull(DeletionsCount);
+ result.Produced = *TValidator::CheckNotNull(Produced);
+
+ result.RecordsCount = *TValidator::CheckNotNull(RecordsCount);
+ result.ColumnRawBytes = *TValidator::CheckNotNull(ColumnRawBytes);
+ result.ColumnBlobBytes = *TValidator::CheckNotNull(ColumnBlobBytes);
+ result.IndexRawBytes = *TValidator::CheckNotNull(IndexRawBytes);
+ result.IndexBlobBytes = *TValidator::CheckNotNull(IndexBlobBytes);
+
return result;
}
@@ -69,6 +73,11 @@ bool TPortionMetaConstructor::LoadMetadata(const NKikimrTxColumnShard::TIndexPor
DeletionsCount = 0;
}
CompactionLevel = portionMeta.GetCompactionLevel();
+ RecordsCount = TValidator::CheckNotNull(portionMeta.GetRecordsCount());
+ ColumnRawBytes = TValidator::CheckNotNull(portionMeta.GetColumnRawBytes());
+ ColumnBlobBytes = TValidator::CheckNotNull(portionMeta.GetColumnBlobBytes());
+ IndexRawBytes = portionMeta.GetIndexRawBytes();
+ IndexBlobBytes = portionMeta.GetIndexBlobBytes();
if (portionMeta.GetIsInserted()) {
Produced = TPortionMeta::EProduced::INSERTED;
} else if (portionMeta.GetIsCompacted()) {
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_meta.h b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h
index 4c55771fdf4..2e36e389f6e 100644
--- a/ydb/core/tx/columnshard/engines/portions/constructor_meta.h
+++ b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h
@@ -16,6 +16,13 @@ private:
std::optional<TSnapshot> RecordSnapshotMax;
std::optional<NPortion::EProduced> Produced;
std::optional<ui64> CompactionLevel;
+
+ std::optional<ui32> RecordsCount;
+ std::optional<ui64> ColumnRawBytes;
+ std::optional<ui32> ColumnBlobBytes;
+ std::optional<ui32> IndexRawBytes;
+ std::optional<ui32> IndexBlobBytes;
+
std::optional<ui32> DeletionsCount;
friend class TPortionInfoConstructor;
void FillMetaInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const ui32 deletionsCount, const std::optional<NArrow::TMinMaxSpecialKeys>& snapshotKeys, const TIndexInfo& indexInfo);
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp
index 7019b577057..0c079d22cf2 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp
@@ -13,6 +13,11 @@ NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto() const {
portionMeta.SetTierName(TierName);
portionMeta.SetCompactionLevel(CompactionLevel);
portionMeta.SetDeletionsCount(DeletionsCount);
+ portionMeta.SetRecordsCount(TValidator::CheckNotNull(RecordsCount));
+ portionMeta.SetColumnRawBytes(TValidator::CheckNotNull(ColumnRawBytes));
+ portionMeta.SetColumnBlobBytes(TValidator::CheckNotNull(ColumnBlobBytes));
+ portionMeta.SetIndexRawBytes(IndexRawBytes);
+ portionMeta.SetIndexBlobBytes(IndexBlobBytes);
switch (Produced) {
case TPortionMeta::EProduced::UNSPECIFIED:
Y_ABORT_UNLESS(false);
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h
index e45156b5d52..a7cc849f94d 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.h
+++ b/ydb/core/tx/columnshard/engines/portions/meta.h
@@ -17,6 +17,12 @@ private:
YDB_READONLY_DEF(TString, TierName);
YDB_READONLY(ui32, DeletionsCount, 0);
YDB_READONLY(ui32, CompactionLevel, 0);
+ YDB_READONLY(ui32, RecordsCount, 0);
+ YDB_READONLY(ui64, ColumnRawBytes, 0);
+ YDB_READONLY(ui32, ColumnBlobBytes, 0);
+ YDB_READONLY(ui32, IndexRawBytes, 0);
+ YDB_READONLY(ui32, IndexBlobBytes, 0);
+
friend class TPortionMetaConstructor;
friend class TPortionInfo;
TPortionMeta(NArrow::TFirstLastSpecialKeys& pk, const TSnapshot& min, const TSnapshot& max)
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index f4a9be0d0f8..419c857a67a 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -11,13 +11,11 @@
namespace NKikimr::NOlap {
ui64 TPortionInfo::GetColumnRawBytes() const {
- AFL_VERIFY(Precalculated);
- return PrecalculatedColumnRawBytes;
+ return GetMeta().GetColumnRawBytes();
}
ui64 TPortionInfo::GetColumnBlobBytes() const {
- AFL_VERIFY(Precalculated);
- return PrecalculatedColumnBlobBytes;
+ return GetMeta().GetColumnBlobBytes();
}
TString TPortionInfo::DebugString(const bool withDetails) const {
@@ -109,7 +107,6 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat
}
Indexes.emplace_back(std::move(parse.DetachResult()));
}
- Precalculate();
return TConclusionStatus::Success();
}
@@ -170,33 +167,6 @@ NSplitter::TEntityGroups TPortionInfo::GetEntityGroupsByStorageId(
}
}
-void TPortionInfo::Precalculate() {
- AFL_VERIFY(!Precalculated);
- Precalculated = true;
- {
- PrecalculatedColumnRawBytes = 0;
- PrecalculatedColumnBlobBytes = 0;
- PrecalculatedRecordsCount = 0;
- const auto aggr = [&](const TColumnRecord& r) {
- PrecalculatedColumnRawBytes += r.GetMeta().GetRawBytes();
- PrecalculatedColumnBlobBytes += r.BlobRange.GetSize();
- if (r.GetColumnId() == Records.front().GetColumnId()) {
- PrecalculatedRecordsCount += r.GetMeta().GetRecordsCount();
- }
- };
- TPortionDataAccessor::AggregateIndexChunksData(aggr, Records, nullptr, true);
- }
- {
- PrecalculatedIndexRawBytes = 0;
- PrecalculatedIndexBlobBytes = 0;
- const auto aggr = [&](const TIndexChunk& r) {
- PrecalculatedIndexRawBytes += r.GetRawBytes();
- PrecalculatedIndexBlobBytes += r.GetDataSize();
- };
- TPortionDataAccessor::AggregateIndexChunksData(aggr, Indexes, nullptr, true);
- }
-}
-
void TPortionInfo::SaveMetaToDatabase(IDbWrapper& db) const {
FullValidation();
db.WritePortion(*this);
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index f2dc79a8c92..67af582c1a0 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -65,14 +65,8 @@ private:
friend class TPortionDataAccessor;
friend class TPortionInfoConstructor;
- ui64 PrecalculatedColumnRawBytes = 0;
- ui64 PrecalculatedColumnBlobBytes = 0;
- ui64 PrecalculatedRecordsCount = 0;
- ui64 PrecalculatedIndexBlobBytes = 0;
- ui64 PrecalculatedIndexRawBytes = 0;
- bool Precalculated = false;
-
- void Precalculate();
+ TPortionInfo(const TPortionInfo&) = default;
+ TPortionInfo& operator=(const TPortionInfo&) = default;
TPortionInfo(TPortionMeta&& meta)
: Meta(std::move(meta)) {
@@ -107,8 +101,15 @@ private:
TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TPortionInfo& proto);
public:
+ TPortionInfo(TPortionInfo&&) = default;
+ TPortionInfo& operator=(TPortionInfo&&) = default;
+
void SaveMetaToDatabase(IDbWrapper& db) const;
+ TPortionInfo MakeCopy() const {
+ return *this;
+ }
+
const std::vector<TUnifiedBlobId>& GetBlobIds() const {
return BlobIds;
}
@@ -433,18 +434,15 @@ public:
ISnapshotSchema::TPtr GetSchema(const TVersionedIndex& index) const;
ui32 GetRecordsCount() const {
- AFL_VERIFY(Precalculated);
- return PrecalculatedRecordsCount;
+ return GetMeta().GetRecordsCount();
}
ui64 GetIndexBlobBytes() const noexcept {
- AFL_VERIFY(Precalculated);
- return PrecalculatedIndexBlobBytes;
+ return GetMeta().GetIndexBlobBytes();
}
ui64 GetIndexRawBytes() const noexcept {
- AFL_VERIFY(Precalculated);
- return PrecalculatedIndexRawBytes;
+ return GetMeta().GetIndexRawBytes();
}
ui64 GetColumnRawBytes() const;
diff --git a/ydb/core/tx/columnshard/engines/protos/portion_info.proto b/ydb/core/tx/columnshard/engines/protos/portion_info.proto
index b62f22790ce..5ecfa7608da 100644
--- a/ydb/core/tx/columnshard/engines/protos/portion_info.proto
+++ b/ydb/core/tx/columnshard/engines/protos/portion_info.proto
@@ -20,6 +20,11 @@ message TIndexPortionMeta {
optional TSnapshot RecordSnapshotMax = 8;
optional uint32 DeletionsCount = 10;
optional uint64 CompactionLevel = 11 [default = 0];
+ optional uint32 RecordsCount = 12;
+ optional uint64 ColumnRawBytes = 13;
+ optional uint32 ColumnBlobBytes = 14;
+ optional uint32 IndexBlobBytes = 15;
+ optional uint64 IndexRawBytes = 16;
}
message TIndexColumnMeta {
@@ -27,5 +32,5 @@ message TIndexColumnMeta {
optional uint32 RawBytes = 2;
optional NKikimrSSA.TProgram.TConstant MinValue = 3;
optional NKikimrSSA.TProgram.TConstant MaxValue = 4;
- optional TIndexPortionMeta PortionMeta = 5; // First PK column could contain portion info
+ optional TIndexPortionMeta PortionMeta = 5[deprecated = true]; // First PK column could contain portion info
}
diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
index c44a42b1c94..78a4a09d230 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
@@ -9,21 +9,16 @@
namespace NKikimr::NOlap {
-void TGranuleMeta::UpsertPortion(const TPortionInfo& info) {
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info.DebugString())("path_id", GetPathId());
- auto it = Portions.find(info.GetPortionId());
- AFL_VERIFY(info.GetPathId() == GetPathId())("event", "incompatible_granule")("portion", info.DebugString())("path_id", GetPathId());
+void TGranuleMeta::AppendPortion(const TPortionInfo::TPtr& info) {
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info->DebugString())("path_id", GetPathId());
+ auto it = Portions.find(info->GetPortionId());
+ AFL_VERIFY(info->GetPathId() == GetPathId())("event", "incompatible_granule")("portion", info->DebugString())("path_id", GetPathId());
- AFL_VERIFY(info.ValidSnapshotInfo())("event", "incorrect_portion_snapshots")("portion", info.DebugString());
+ AFL_VERIFY(info->ValidSnapshotInfo())("event", "incorrect_portion_snapshots")("portion", info->DebugString());
- if (it == Portions.end()) {
- OnBeforeChangePortion(nullptr);
- auto portionNew = std::make_shared<TPortionInfo>(info);
- it = Portions.emplace(portionNew->GetPortionId(), portionNew).first;
- } else {
- OnBeforeChangePortion(it->second);
- it->second = std::make_shared<TPortionInfo>(info);
- }
+ AFL_VERIFY(it == Portions.end());
+ OnBeforeChangePortion(nullptr);
+ it = Portions.emplace(info->GetPortionId(), info).first;
OnAfterChangePortion(it->second, nullptr);
}
@@ -138,17 +133,14 @@ TGranuleMeta::TGranuleMeta(
ActualizationIndex = std::make_shared<NActualizer::TGranuleActualizationIndex>(PathId, versionedIndex);
}
-std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(TPortionInfo&& portion) {
- if (portion.HasInsertWriteId() && !portion.HasCommitSnapshot()) {
- const TInsertWriteId insertWriteId = portion.GetInsertWriteIdVerified();
- auto emplaceInfo = InsertedPortions.emplace(insertWriteId, std::make_shared<TPortionInfo>(std::move(portion)));
- AFL_VERIFY(emplaceInfo.second);
- return emplaceInfo.first->second;
+void TGranuleMeta::UpsertPortionOnLoad(const std::shared_ptr<TPortionInfo>&& portion) {
+ if (portion->HasInsertWriteId() && !portion->HasCommitSnapshot()) {
+ const TInsertWriteId insertWriteId = portion->GetInsertWriteIdVerified();
+ AFL_VERIFY(InsertedPortions.emplace(insertWriteId, portion).second);
+ AFL_VERIFY(!Portions.contains(portion->GetPortionId()));
} else {
- auto portionId = portion.GetPortionId();
- auto emplaceInfo = Portions.emplace(portionId, std::make_shared<TPortionInfo>(std::move(portion)));
- AFL_VERIFY(emplaceInfo.second);
- return emplaceInfo.first->second;
+ auto portionId = portion->GetPortionId();
+ AFL_VERIFY(Portions.emplace(portionId, portion).second);
}
}
@@ -182,7 +174,7 @@ void TGranuleMeta::ResetOptimizer(const std::shared_ptr<NStorageOptimizer::IOpti
void TGranuleMeta::CommitPortionOnComplete(const TInsertWriteId insertWriteId, IColumnEngine& engine) {
auto it = InsertedPortions.find(insertWriteId);
AFL_VERIFY(it != InsertedPortions.end());
- (static_cast<TColumnEngineForLogs&>(engine)).AppendPortion(*it->second);
+ (static_cast<TColumnEngineForLogs&>(engine)).AppendPortion(it->second);
InsertedPortions.erase(it);
}
@@ -195,7 +187,7 @@ void TGranuleMeta::CommitImmediateOnExecute(
}
void TGranuleMeta::CommitImmediateOnComplete(const std::shared_ptr<TPortionInfo> portion, IColumnEngine& engine) {
- (static_cast<TColumnEngineForLogs&>(engine)).AppendPortion(*portion);
+ (static_cast<TColumnEngineForLogs&>(engine)).AppendPortion(portion);
}
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.h b/ydb/core/tx/columnshard/engines/storage/granule/granule.h
index acf6d4b3418..c9686573e2f 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.h
@@ -134,12 +134,6 @@ private:
void OnAdditiveSummaryChange() const;
YDB_READONLY(TMonotonic, LastCompactionInstant, TMonotonic::Zero());
-public:
- void RefreshTiering(const std::optional<TTiering>& tiering) {
- NActualizer::TAddExternalContext context(HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now(), Portions);
- ActualizationIndex->RefreshTiering(tiering, context);
- }
-
TConclusion<std::shared_ptr<TPortionInfo>> GetInnerPortion(const TPortionInfo::TConstPtr& portion) const {
if (!portion) {
return TConclusionStatus::Fail("empty input portion pointer");
@@ -155,12 +149,18 @@ public:
return it->second;
}
+public:
+ void RefreshTiering(const std::optional<TTiering>& tiering) {
+ NActualizer::TAddExternalContext context(HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now(), Portions);
+ ActualizationIndex->RefreshTiering(tiering, context);
+ }
+
template <class TModifier>
void ModifyPortionOnExecute(
IDbWrapper& wrapper, const TPortionInfo::TConstPtr& portion, const TModifier& modifier, const ui32 firstPKColumnId) const {
const auto innerPortion = GetInnerPortion(portion).DetachResult();
AFL_VERIFY((ui64)innerPortion.get() == (ui64)portion.get());
- auto copy = *innerPortion;
+ auto copy = innerPortion->MakeCopy();
modifier(copy);
TPortionDataAccessor(std::make_shared<TPortionInfo>(std::move(copy))).SaveToDatabase(wrapper, firstPKColumnId, false);
}
@@ -298,7 +298,7 @@ public:
void OnCompactionFailed(const TString& reason);
void OnCompactionFinished();
- void UpsertPortion(const TPortionInfo& info);
+ void AppendPortion(const TPortionInfo::TPtr& info);
TString DebugString() const {
return TStringBuilder() << "(granule:" << GetPathId() << ";"
@@ -308,7 +308,7 @@ public:
<< ")";
}
- std::shared_ptr<TPortionInfo> UpsertPortionOnLoad(TPortionInfo&& portion);
+ void UpsertPortionOnLoad(const std::shared_ptr<TPortionInfo>&& portion);
const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetPortions() const {
return Portions;
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp
index cac07cdd8ec..a3bb6bfd645 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp
@@ -50,7 +50,7 @@ public:
}
void EraseColumn(const TPortionInfo&, const TColumnRecord&) override {
}
- bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>&) override {
+ bool LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>&) override {
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
index 1cabcd07dd1..817aef1ea22 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
@@ -88,13 +88,21 @@ public:
return true;
}
- virtual void WritePortion(const NOlap::TPortionInfo& /*portion*/) override {
-
+ virtual void WritePortion(const NOlap::TPortionInfo& portion) override {
+ auto it = Portions.find(portion.GetPortionId());
+ if (it == Portions.end()) {
+ Portions.emplace(portion.GetPortionId(), portion.MakeCopy());
+ } else {
+ it->second = portion.MakeCopy();
+ }
}
- virtual void ErasePortion(const NOlap::TPortionInfo& /*portion*/) override {
-
+ virtual void ErasePortion(const NOlap::TPortionInfo& portion) override {
+ AFL_VERIFY(Portions.erase(portion.GetPortionId()));
}
- virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& /*callback*/) override {
+ virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) override {
+ for (auto&& i : Portions) {
+ callback(NOlap::TPortionInfoConstructor(i.second, false, true), i.second.GetMeta().SerializeToProto());
+ }
return true;
}
@@ -105,7 +113,8 @@ public:
}
auto& data = Indices[0].Columns[portion.GetPathId()];
- NOlap::TColumnChunkLoadContext loadContext(row.GetAddress(), portion.RestoreBlobRange(row.BlobRange), rowProto);
+ NOlap::TColumnChunkLoadContext loadContext(
+ portion.GetPathId(), portion.GetPortionId(), row.GetAddress(), portion.RestoreBlobRange(row.BlobRange), rowProto);
auto itInsertInfo = LoadContexts[portion.GetAddress()].emplace(row.GetAddress(), loadContext);
if (!itInsertInfo.second) {
itInsertInfo.first->second = loadContext;
@@ -153,7 +162,7 @@ public:
portionLocal.MutableRecords().swap(filtered);
}
- bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) override {
+ bool LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) override {
auto& columns = Indices[0].Columns;
for (auto& [pathId, portions] : columns) {
for (auto& [portionId, portionLocal] : portions) {
@@ -163,7 +172,7 @@ public:
auto itContextLoader = LoadContexts[copy.GetAddress()].find(rec.GetAddress());
Y_ABORT_UNLESS(itContextLoader != LoadContexts[copy.GetAddress()].end());
auto address = copy.GetAddress();
- callback(std::move(copy), itContextLoader->second);
+ callback(itContextLoader->second);
LoadContexts[address].erase(itContextLoader);
}
}
@@ -192,6 +201,7 @@ private:
THashMap<TInsertWriteId, TInsertedData> Inserted;
THashMap<ui64, TSet<TCommittedData>> Committed;
THashMap<TInsertWriteId, TInsertedData> Aborted;
+ THashMap<ui64, NOlap::TPortionInfo> Portions;
THashMap<ui32, TIndex> Indices;
};
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
index e75099ecd9b..4cdd52799ee 100644
--- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
@@ -52,13 +52,13 @@ public:
enum class ENormalizerSequentialId: ui32 {
Granules = 1,
Chunks,
- PortionsCleaner,
TablesCleaner,
- PortionsMetadata,
CleanGranuleId,
EmptyPortionsCleaner,
CleanInsertionDedup,
GCCountersNormalizer,
+ RestorePortionFromChunks,
+ SyncPortionFromChunks,
MAX
};
diff --git a/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp
index c7a2cdf4d30..9772beef418 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp
@@ -29,7 +29,7 @@ public:
AFL_VERIFY(!!schema)("portion_id", portionInfo.GetPortionInfo().GetPortionId());
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "portion_removed_as_broken")(
"portion_id", portionInfo.GetPortionInfo().GetAddress().DebugString());
- auto copy = portionInfo.GetPortionInfo();
+ auto copy = portionInfo.GetPortionInfo().MakeCopy();
copy.SetRemoveSnapshot(TSnapshot(1, 1));
copy.SaveMetaToDatabase(db);
}
diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp
new file mode 100644
index 00000000000..5f14318750d
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp
@@ -0,0 +1,184 @@
+#include "chunks_actualization.h"
+#include "normalizer.h"
+
+#include <ydb/core/formats/arrow/size_calcer.h>
+#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/tables_manager.h>
+
+namespace NKikimr::NOlap::NSyncChunksWithPortions1 {
+
+class TPatchItem {
+private:
+ TPortionLoadContext PortionInfo;
+ YDB_READONLY_DEF(std::vector<TColumnChunkLoadContext>, Records);
+ YDB_READONLY_DEF(std::vector<TIndexChunkLoadContext>, Indexes);
+
+public:
+ const TPortionLoadContext& GetPortionInfo() const {
+ return PortionInfo;
+ }
+
+ TPatchItem(TPortionLoadContext&& portion, std::vector<TColumnChunkLoadContext>&& records, std::vector<TIndexChunkLoadContext>&& indexes)
+ : PortionInfo(std::move(portion))
+ , Records(std::move(records))
+ , Indexes(std::move(indexes))
+ {
+
+ }
+};
+
+class TChanges: public INormalizerChanges {
+private:
+ std::vector<TPatchItem> Patches;
+
+public:
+ TChanges(std::vector<TPatchItem>&& patches)
+ : Patches(std::move(patches)) {
+ }
+ virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+ for (auto&& i : Patches) {
+ auto meta = i.GetPortionInfo().GetMetaProto();
+ ui32 recordsCount = 0;
+ ui64 columnRawBytes = 0;
+ ui64 indexRawBytes = 0;
+ ui32 columnBlobBytes = 0;
+ ui32 indexBlobBytes = 0;
+
+ for (auto&& c : i.GetRecords()) {
+ columnRawBytes += c.GetMetaProto().GetRawBytes();
+ columnBlobBytes += c.GetBlobRange().GetSize();
+ if (i.GetRecords().front().GetAddress().GetColumnId() == c.GetAddress().GetColumnId()) {
+ recordsCount += c.GetMetaProto().GetNumRows();
+ }
+ }
+ for (auto&& c : i.GetIndexes()) {
+ columnRawBytes += c.GetRawBytes();
+ columnBlobBytes += c.GetDataSize();
+ }
+ meta.SetRecordsCount(recordsCount);
+ meta.SetColumnRawBytes(columnRawBytes);
+ meta.SetColumnBlobBytes(columnBlobBytes);
+ meta.SetIndexRawBytes(indexRawBytes);
+ meta.SetIndexBlobBytes(indexBlobBytes);
+
+ db.Table<Schema::IndexPortions>()
+ .Key(i.GetPortionInfo().GetPathId(), i.GetPortionInfo().GetPortionId())
+ .Update(NIceDb::TUpdate<Schema::IndexPortions::Metadata>(meta.SerializeAsString()));
+ }
+
+ return true;
+ }
+
+ virtual ui64 GetSize() const override {
+ return Patches.size();
+ }
+
+};
+
+TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
+ const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+
+ bool ready = true;
+ ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
+ ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme());
+ ready = ready & Schema::Precharge<Schema::IndexIndexes>(db, txc.DB.GetScheme());
+ if (!ready) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ THashMap<ui64, TPortionLoadContext> dbPortions;
+ THashMap<ui64, std::vector<TColumnChunkLoadContext>> recordsByPortion;
+ THashMap<ui64, std::vector<TIndexChunkLoadContext>> indexesByPortion;
+
+ {
+ auto rowset = db.Table<Schema::IndexPortions>().Select();
+ if (!rowset.IsReady()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ while (!rowset.EndOfSet()) {
+ TPortionLoadContext portion(rowset);
+ AFL_VERIFY(dbPortions.emplace(portion.GetPortionId(), portion).second);
+
+ if (!rowset.Next()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+ }
+ }
+
+ {
+ auto rowset = db.Table<Schema::IndexColumns>().Select();
+ if (!rowset.IsReady()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ while (!rowset.EndOfSet()) {
+ TColumnChunkLoadContext chunk(rowset, &DsGroupSelector);
+ const ui64 portionId = chunk.GetPortionId();
+ AFL_VERIFY(dbPortions.contains(portionId));
+ recordsByPortion[portionId].emplace_back(std::move(chunk));
+
+ if (!rowset.Next()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+ }
+ }
+
+ {
+ auto rowset = db.Table<Schema::IndexIndexes>().Select();
+ if (!rowset.IsReady()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ while (!rowset.EndOfSet()) {
+ TIndexChunkLoadContext chunk(rowset, &DsGroupSelector);
+ const ui64 portionId = chunk.GetPortionId();
+ AFL_VERIFY(dbPortions.contains(portionId));
+ indexesByPortion[portionId].emplace_back(std::move(chunk));
+
+ if (!rowset.Next()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+ }
+ }
+ AFL_VERIFY(dbPortions.size() == recordsByPortion.size())("portions", dbPortions.size())("records", recordsByPortion.size());
+
+ for (auto&& i : indexesByPortion) {
+ AFL_VERIFY(dbPortions.contains(i.first));
+ }
+
+ std::vector<INormalizerTask::TPtr> tasks;
+ if (dbPortions.empty()) {
+ return tasks;
+ }
+
+ std::vector<TPatchItem> package;
+
+ for (auto&& [_, portion] : dbPortions) {
+ if (portion.GetMetaProto().GetRecordsCount()) {
+ continue;
+ }
+ auto itRecords = recordsByPortion.find(portion.GetPortionId());
+ AFL_VERIFY(itRecords != recordsByPortion.end());
+ auto itIndexes = indexesByPortion.find(portion.GetPortionId());
+ auto indexes = (itIndexes == indexesByPortion.end()) ? Default<std::vector<TIndexChunkLoadContext>>() : itIndexes->second;
+ package.emplace_back(std::move(portion), std::move(itRecords->second), std::move(indexes));
+ if (package.size() == 100) {
+ std::vector<TPatchItem> local;
+ local.swap(package);
+ tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(local))));
+ }
+ }
+
+ if (package.size() > 0) {
+ tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(package))));
+ }
+ return tasks;
+}
+
+} // namespace NKikimr::NOlap::NChunksActualization
diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h
new file mode 100644
index 00000000000..71b1b1fd44f
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+#include <ydb/core/tx/columnshard/defs.h>
+#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
+
+namespace NKikimr::NColumnShard {
+class TTablesManager;
+}
+
+namespace NKikimr::NOlap::NSyncChunksWithPortions1 {
+
+class TNormalizer: public TNormalizationController::INormalizerComponent {
+public:
+ static TString GetClassNameStatic() {
+ return ::ToString(ENormalizerSequentialId::SyncPortionFromChunks);
+ }
+
+ virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
+ return ENormalizerSequentialId::SyncPortionFromChunks;
+ }
+
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+
+ class TNormalizerResult;
+
+ static inline INormalizerComponent::TFactory::TRegistrator<TNormalizer> Registrator =
+ INormalizerComponent::TFactory::TRegistrator<TNormalizer>(GetClassNameStatic());
+
+public:
+ TNormalizer(const TNormalizationController::TInitContext& info)
+ : DsGroupSelector(info.GetStorageInfo()) {
+ }
+
+ virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
+ const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
+
+private:
+ NColumnShard::TBlobGroupSelector DsGroupSelector;
+};
+} // namespace NKikimr::NOlap::NChunksActualization
diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean.h b/ydb/core/tx/columnshard/normalizer/portion/clean.h
index a9b4b95bd06..6b01d65b377 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/clean.h
+++ b/ydb/core/tx/columnshard/normalizer/portion/clean.h
@@ -14,7 +14,7 @@ namespace NKikimr::NOlap {
class TCleanPortionsNormalizer: public TPortionsNormalizerBase {
public:
static TString GetClassNameStatic() {
- return ::ToString(ENormalizerSequentialId::PortionsCleaner);
+ return "PortionsCleaner";
}
private:
@@ -27,7 +27,7 @@ public:
public:
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
- return ENormalizerSequentialId::PortionsCleaner;
+ return std::nullopt;
}
virtual TString GetClassName() const override {
diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
index 5bda885409b..d581a6579d8 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
@@ -129,7 +129,7 @@ TConclusionStatus TPortionsNormalizerBase::InitColumns(
} else {
it->second.Merge(std::move(portion));
}
- it->second.LoadRecord(currentSchema->GetIndexInfo(), loadContext);
+ it->second.LoadRecord(loadContext);
};
while (!rowset.EndOfSet()) {
diff --git a/ydb/core/tx/columnshard/normalizer/portion/portion.h b/ydb/core/tx/columnshard/normalizer/portion/portion.h
index 134b2d94ba6..812e51412f3 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/portion.h
+++ b/ydb/core/tx/columnshard/normalizer/portion/portion.h
@@ -14,7 +14,7 @@ namespace NKikimr::NOlap {
class TPortionsNormalizer : public TPortionsNormalizerBase {
public:
static TString GetClassNameStatic() {
- return ::ToString(ENormalizerSequentialId::PortionsMetadata);
+ return "PortionsMetadata";
}
private:
@@ -26,7 +26,7 @@ public:
public:
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
- return ENormalizerSequentialId::PortionsMetadata;
+ return std::nullopt;
}
virtual TString GetClassName() const override {
diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp
new file mode 100644
index 00000000000..f4fb377f3b7
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp
@@ -0,0 +1,147 @@
+#include "normalizer.h"
+#include "restore_portion_from_chunks.h"
+
+#include <ydb/core/formats/arrow/size_calcer.h>
+#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/tables_manager.h>
+
+namespace NKikimr::NOlap::NRestorePortionsFromChunks {
+
+class TPatchItem {
+private:
+ YDB_READONLY(ui32, SchemaVersion, 0);
+ TColumnChunkLoadContext ChunkInfo;
+
+public:
+ const TColumnChunkLoadContext& GetChunkInfo() const {
+ return ChunkInfo;
+ }
+
+ TPatchItem(const ui32 schemaVersion, TColumnChunkLoadContext&& chunkInfo)
+ : SchemaVersion(schemaVersion)
+ , ChunkInfo(std::move(chunkInfo)) {
+ }
+};
+
+class TChanges: public INormalizerChanges {
+private:
+ std::vector<TPatchItem> Patches;
+
+public:
+ TChanges(std::vector<TPatchItem>&& patches)
+ : Patches(std::move(patches)) {
+ }
+ virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+ for (auto&& i : Patches) {
+ AFL_VERIFY(i.GetChunkInfo().GetMetaProto().HasPortionMeta());
+ auto metaProtoString = i.GetChunkInfo().GetMetaProto().GetPortionMeta().SerializeAsString();
+ using IndexPortions = NColumnShard::Schema::IndexPortions;
+ const auto removeSnapshot = i.GetChunkInfo().GetRemoveSnapshot();
+ const auto minSnapshotDeprecated = i.GetChunkInfo().GetMinSnapshotDeprecated();
+ db.Table<IndexPortions>()
+ .Key(i.GetChunkInfo().GetPathId(), i.GetChunkInfo().GetPortionId())
+ .Update(NIceDb::TUpdate<IndexPortions::SchemaVersion>(i.GetSchemaVersion()), NIceDb::TUpdate<IndexPortions::ShardingVersion>(0),
+ NIceDb::TUpdate<IndexPortions::CommitPlanStep>(0), NIceDb::TUpdate<IndexPortions::CommitTxId>(0),
+ NIceDb::TUpdate<IndexPortions::InsertWriteId>(0), NIceDb::TUpdate<IndexPortions::XPlanStep>(removeSnapshot.GetPlanStep()),
+ NIceDb::TUpdate<IndexPortions::XTxId>(removeSnapshot.GetTxId()),
+ NIceDb::TUpdate<IndexPortions::MinSnapshotPlanStep>(minSnapshotDeprecated.GetPlanStep()),
+ NIceDb::TUpdate<IndexPortions::MinSnapshotTxId>(minSnapshotDeprecated.GetTxId()),
+ NIceDb::TUpdate<IndexPortions::Metadata>(metaProtoString));
+ }
+
+ return true;
+ }
+
+ virtual ui64 GetSize() const override {
+ return Patches.size();
+ }
+};
+
+TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
+ const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+
+ bool ready = true;
+ ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
+ ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme());
+ if (!ready) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ TTablesManager tablesManager(controller.GetStoragesManager(), 0);
+ if (!tablesManager.InitFromDB(db)) {
+ ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager");
+ return TConclusionStatus::Fail("Can't load index");
+ }
+
+ THashMap<ui64, TPortionLoadContext> dbPortions;
+
+ {
+ auto rowset = db.Table<Schema::IndexPortions>().Select();
+ if (!rowset.IsReady()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ while (!rowset.EndOfSet()) {
+ TPortionLoadContext portion(rowset);
+ AFL_VERIFY(dbPortions.emplace(portion.GetPortionId(), portion).second);
+
+ if (!rowset.Next()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+ }
+ }
+
+ THashMap<ui64, TColumnChunkLoadContext> portionsToWrite;
+ {
+ auto rowset = db.Table<Schema::IndexColumns>().Select();
+ if (!rowset.IsReady()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ THashSet<ui64> portionsToRestore;
+ while (!rowset.EndOfSet()) {
+ TColumnChunkLoadContext chunk(rowset, &DsGroupSelector);
+ if (!dbPortions.contains(chunk.GetPortionId())) {
+ portionsToRestore.emplace(chunk.GetPortionId());
+ if (chunk.GetMetaProto().HasPortionMeta()) {
+ AFL_VERIFY(portionsToWrite.emplace(chunk.GetPortionId(), chunk).second);
+ }
+ }
+
+ if (!rowset.Next()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+ }
+ AFL_VERIFY(portionsToRestore.size() == portionsToWrite.size());
+ }
+
+ std::vector<INormalizerTask::TPtr> tasks;
+ if (portionsToWrite.empty()) {
+ return tasks;
+ }
+
+ std::vector<TPatchItem> package;
+
+ for (auto&& [_, chunkWithPortionData] : portionsToWrite) {
+ package.emplace_back(
+ tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(chunkWithPortionData.GetMinSnapshotDeprecated())->GetVersion(),
+ std::move(chunkWithPortionData));
+ if (package.size() == 100) {
+ std::vector<TPatchItem> local;
+ local.swap(package);
+ tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(local))));
+ }
+ }
+
+ if (package.size() > 0) {
+ tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(package))));
+ }
+ return tasks;
+}
+
+} // namespace NKikimr::NOlap::NRestorePortionsFromChunks
diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h
new file mode 100644
index 00000000000..f8120c16ef3
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+#include <ydb/core/tx/columnshard/defs.h>
+#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
+
+namespace NKikimr::NColumnShard {
+class TTablesManager;
+}
+
+namespace NKikimr::NOlap::NRestorePortionsFromChunks {
+
+class TNormalizer: public TNormalizationController::INormalizerComponent {
+public:
+ static TString GetClassNameStatic() {
+ return ::ToString(ENormalizerSequentialId::RestorePortionFromChunks);
+ }
+
+ virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
+ return ENormalizerSequentialId::RestorePortionFromChunks;
+ }
+
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+
+ class TNormalizerResult;
+
+ static inline INormalizerComponent::TFactory::TRegistrator<TNormalizer> Registrator =
+ INormalizerComponent::TFactory::TRegistrator<TNormalizer>(GetClassNameStatic());
+
+public:
+ TNormalizer(const TNormalizationController::TInitContext& info)
+ : DsGroupSelector(info.GetStorageInfo()) {
+ }
+
+ virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
+ const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
+
+private:
+ NColumnShard::TBlobGroupSelector DsGroupSelector;
+};
+} // namespace NKikimr::NOlap::NChunksActualization
diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make
index e53cf5497b8..386cbe711bc 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/ya.make
+++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make
@@ -8,6 +8,8 @@ SRCS(
GLOBAL clean_empty.cpp
GLOBAL broken_blobs.cpp
GLOBAL special_cleaner.cpp
+ GLOBAL chunks_actualization.cpp
+ GLOBAL restore_portion_from_chunks.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
index 7fe1cb04405..bfd5bd88909 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
@@ -37,66 +37,12 @@ public:
virtual ~TNormalizerChecker() {
}
- virtual ui64 RecordsCountAfterReboot(const ui64 initialRecodsCount) const {
- return initialRecodsCount;
- }
-};
-
-class TPathIdCleaner: public NYDBTest::ILocalDBModifier {
-public:
- virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
- using namespace NColumnShard;
- NIceDb::TNiceDb db(txc.DB);
-
- THashMap<ui64, TPortionRecord> portion2Key;
- std::optional<ui64> pathId;
- {
- auto rowset = db.Table<Schema::IndexColumns>().Select();
- UNIT_ASSERT(rowset.IsReady());
-
- while (!rowset.EndOfSet()) {
- TPortionRecord key;
- key.Index = rowset.GetValue<Schema::IndexColumns::Index>();
- key.Granule = rowset.GetValue<Schema::IndexColumns::Granule>();
- key.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>();
- key.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>();
- key.TxId = rowset.GetValue<Schema::IndexColumns::TxId>();
- key.Portion = rowset.GetValue<Schema::IndexColumns::Portion>();
- key.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>();
-
- key.XPlanStep = rowset.GetValue<Schema::IndexColumns::XPlanStep>();
- key.XTxId = rowset.GetValue<Schema::IndexColumns::XTxId>();
- key.Blob = rowset.GetValue<Schema::IndexColumns::Blob>();
- key.Metadata = rowset.GetValue<Schema::IndexColumns::Metadata>();
- key.Offset = rowset.GetValue<Schema::IndexColumns::Offset>();
- key.Size = rowset.GetValue<Schema::IndexColumns::Size>();
-
- pathId = rowset.GetValue<Schema::IndexColumns::PathId>();
-
- portion2Key[key.Portion] = key;
-
- UNIT_ASSERT(rowset.Next());
- }
- }
-
- UNIT_ASSERT(pathId.has_value());
-
- for (auto&& [portionId, key] : portion2Key) {
- db.Table<Schema::IndexColumns>().Key(key.Index, key.Granule, key.ColumnIdx, key.PlanStep, key.TxId, key.Portion, key.Chunk).Delete();
+ virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& /*columnShardConfig*/) const {
- db.Table<Schema::IndexColumns>()
- .Key(key.Index, 1, key.ColumnIdx, key.PlanStep, key.TxId, key.Portion, key.Chunk)
- .Update(NIceDb::TUpdate<Schema::IndexColumns::XPlanStep>(key.XPlanStep), NIceDb::TUpdate<Schema::IndexColumns::XTxId>(key.XTxId),
- NIceDb::TUpdate<Schema::IndexColumns::Blob>(key.Blob), NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Metadata),
- NIceDb::TUpdate<Schema::IndexColumns::Offset>(key.Offset), NIceDb::TUpdate<Schema::IndexColumns::Size>(key.Size),
-
- NIceDb::TNull<Schema::IndexColumns::PathId>());
- }
+ }
- db.Table<Schema::IndexGranules>()
- .Key(0, *pathId, "1")
- .Update(NIceDb::TUpdate<Schema::IndexGranules::Granule>(1), NIceDb::TUpdate<Schema::IndexGranules::PlanStep>(1),
- NIceDb::TUpdate<Schema::IndexGranules::TxId>(1), NIceDb::TUpdate<Schema::IndexGranules::Metadata>(""));
+ virtual ui64 RecordsCountAfterReboot(const ui64 initialRecodsCount) const {
+ return initialRecodsCount;
}
};
@@ -308,9 +254,7 @@ Y_UNIT_TEST_SUITE(Normalizers) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
- auto* repair = runtime.GetAppData().ColumnShardConfig.MutableRepairs()->Add();
- repair->SetClassName("SchemaVersionCleaner");
- repair->SetDescription("Removing unused schema versions");
+ checker.CorrectConfigurationOnStart(runtime.GetAppData().ColumnShardConfig);
const ui64 tableId = 1;
const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)),
@@ -344,10 +288,6 @@ Y_UNIT_TEST_SUITE(Normalizers) {
}
}
- Y_UNIT_TEST(PathIdNormalizer) {
- TestNormalizerImpl<TPathIdCleaner>();
- }
-
Y_UNIT_TEST(ColumnChunkNormalizer) {
TestNormalizerImpl<TColumnChunksCleaner>();
}
@@ -357,7 +297,15 @@ Y_UNIT_TEST_SUITE(Normalizers) {
}
Y_UNIT_TEST(SchemaVersionsNormalizer) {
- TestNormalizerImpl<TSchemaVersionsCleaner>();
+ class TLocalNormalizerChecker: public TNormalizerChecker {
+ public:
+ virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& columnShardConfig) const override {
+ auto* repair = columnShardConfig.MutableRepairs()->Add();
+ repair->SetClassName("SchemaVersionCleaner");
+ repair->SetDescription("Removing unused schema versions");
+ }
+ };
+ TestNormalizerImpl<TSchemaVersionsCleaner>(TLocalNormalizerChecker());
}
Y_UNIT_TEST(CleanEmptyPortionsNormalizer) {
@@ -371,6 +319,11 @@ Y_UNIT_TEST_SUITE(Normalizers) {
ui64 RecordsCountAfterReboot(const ui64) const override {
return 0;
}
+ virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& columnShardConfig) const override {
+ auto* repair = columnShardConfig.MutableRepairs()->Add();
+ repair->SetClassName("PortionsCleaner");
+ repair->SetDescription("Removing dirty portions withno tables");
+ }
};
TLocalNormalizerChecker checker;
TestNormalizerImpl<TTablesCleaner>(checker);