diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-16 15:16:27 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-16 16:39:12 +0300 |
commit | 5c2405ab8c1e8cf1ba22505d529bdc0d96ecf675 (patch) | |
tree | 243e5bc1fab3f529efe4aaf31dc6c06933ade14e | |
parent | 22d756319c13b10fe917926367e9919ac205f05e (diff) | |
download | ydb-5c2405ab8c1e8cf1ba22505d529bdc0d96ecf675.tar.gz |
KIKIMR-18932: meta incapsulation for merging from different chunks
37 files changed, 609 insertions, 383 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index c0473293c9..5a7cf7c09e 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -87,6 +87,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 25dbd82022..a059a18ad9 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -88,6 +88,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 25dbd82022..a059a18ad9 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -88,6 +88,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index c0473293c9..5a7cf7c09e 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -87,6 +87,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp new file mode 100644 index 0000000000..3ec637e786 --- /dev/null +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -0,0 +1,28 @@ +#include "columnshard_schema.h" + +namespace NKikimr::NColumnShard { + +bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index, const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback) { + auto rowset = db.Table<IndexColumns>().Prefix(index).Select(); + if (!rowset.IsReady()) + return false; + + while (!rowset.EndOfSet()) { + NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty(); + portion.SetGranule(rowset.GetValue<IndexColumns::Granule>()); + portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>()); + portion.SetPortion(rowset.GetValue<IndexColumns::Portion>()); + + NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, dsGroupSelector); + + portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>()); + + callback(portion, chunkLoadContext); + + if (!rowset.Next()) + return false; + } + return true; +} + +} diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index e4b8538503..d4e734b727 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -12,6 +12,10 @@ #include <type_traits> +namespace NKikimr::NOlap { +class TColumnChunkLoadContext; +} + namespace NKikimr::NColumnShard { using NOlap::TWriteId; @@ -592,15 +596,20 @@ struct Schema : NIceDb::Schema { // IndexColumns activities static void IndexColumns_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { + auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk); + auto rowProto = row.GetMeta().SerializeToProto(); + if (proto) { + *rowProto.MutablePortionMeta() = std::move(*proto); + } db.Table<IndexColumns>().Key(index, portion.GetGranule(), row.ColumnId, portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update( - NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()), - NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()), - NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()), - NIceDb::TUpdate<IndexColumns::Metadata>(row.Metadata), - NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset), - NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size) - ); + NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()), + NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()), + NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()), + NIceDb::TUpdate<IndexColumns::Metadata>(rowProto.SerializeAsString()), + NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset), + NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size) + ); } static void IndexColumns_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) { @@ -609,37 +618,7 @@ struct Schema : NIceDb::Schema { } static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index, - const std::function<void(const NOlap::TPortionInfo&, const TColumnRecord&)>& callback) { - auto rowset = db.Table<IndexColumns>().Prefix(index).Select(); - if (!rowset.IsReady()) - return false; - - while (!rowset.EndOfSet()) { - TColumnRecord row; - NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty(); - portion.SetGranule(rowset.GetValue<IndexColumns::Granule>()); - row.ColumnId = rowset.GetValue<IndexColumns::ColumnIdx>(); - portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>()); - portion.SetPortion(rowset.GetValue<IndexColumns::Portion>()); - row.Chunk = rowset.GetValue<IndexColumns::Chunk>(); - portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>()); - - TString strBlobId = rowset.GetValue<IndexColumns::Blob>(); - row.Metadata = rowset.GetValue<IndexColumns::Metadata>(); - row.BlobRange.Offset = rowset.GetValue<IndexColumns::Offset>(); - row.BlobRange.Size = rowset.GetValue<IndexColumns::Size>(); - - Y_VERIFY(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size()); - TLogoBlobID logoBlobId((const ui64*)strBlobId.data()); - row.BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId); - - callback(portion, row); - - if (!rowset.Next()) - return false; - } - return true; - } + const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback); // IndexCounters @@ -688,3 +667,49 @@ struct Schema : NIceDb::Schema { }; } + +namespace NKikimr::NOlap { +class TColumnChunkLoadContext { +private: + YDB_READONLY_DEF(TBlobRange, BlobRange); + TChunkAddress Address; + YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto); +public: + const TChunkAddress& GetAddress() const { + return Address; + } + + TColumnChunkLoadContext(const TChunkAddress& address, const TBlobRange& bRange, const NKikimrTxColumnShard::TIndexColumnMeta& metaProto) + : BlobRange(bRange) + , Address(address) + , MetaProto(metaProto) + { + + } + + template <class TSource> + TColumnChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector) + : Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(), rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>()) { + AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString()); + TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Blob>(); + Y_VERIFY(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size()); + TLogoBlobID logoBlobId((const ui64*)strBlobId.data()); + BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId); + BlobRange.Offset = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Offset>(); + BlobRange.Size = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Size>(); + AFL_VERIFY(BlobRange.BlobId.IsValid() && BlobRange.Size)("event", "incorrect blob")("blob", BlobRange.ToString()); + + const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Metadata>(); + AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf"); + } + + const NKikimrTxColumnShard::TIndexPortionMeta* GetPortionMeta() const { + if (MetaProto.HasPortionMeta()) { + return &MetaProto.GetPortionMeta(); + } else { + return nullptr; + } + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index 315cfbe91c..073823a663 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -38,7 +38,7 @@ bool TTTLColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyC const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion); Y_VERIFY(oldInfo.IsActive()); - Y_VERIFY(portionInfo.TierName != oldInfo.TierName); + Y_VERIFY(portionInfo.GetMeta().GetTierName() != oldInfo.GetMeta().GetTierName()); self.UpsertPortion(portionInfo, &oldInfo); @@ -60,7 +60,7 @@ void TTTLColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWr auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); // Mark exported blobs if (evictionFeatures.NeedExport) { - auto& tierName = portionInfo.TierName; + auto& tierName = portionInfo.GetMeta().GetTierName(); Y_VERIFY(!tierName.empty()); for (auto& rec : portionInfo.Records) { @@ -161,14 +161,14 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfoWithBlobs& portio const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords, TConstructionContext& context) const { TPortionInfo& portionInfo = portionInfoWithBlobs.GetPortionInfo(); - Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName); + Y_VERIFY(portionInfo.GetMeta().GetTierName() != evictFeatures.TargetTierName); auto* tiering = Tiering.FindPtr(evictFeatures.PathId); Y_VERIFY(tiering); auto compression = tiering->GetCompression(evictFeatures.TargetTierName); if (!compression) { // Noting to recompress. We have no other kinds of evictions yet. - portionInfo.TierName = evictFeatures.TargetTierName; + portionInfo.MutableMeta().SetTierName(evictFeatures.TargetTierName); evictFeatures.DataChanges = false; return true; } @@ -191,21 +191,22 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfoWithBlobs& portio auto field = resultSchema->GetFieldByIndex(pos); TString blob; + std::shared_ptr<arrow::RecordBatch> rb; { auto it = srcBlobs.find(rec.BlobRange); Y_VERIFY(it != srcBlobs.end()); - auto rb = resultSchema->GetColumnLoader(rec.ColumnId)->Apply(it->second); - Y_VERIFY(rb.ok()); + rb = NArrow::TStatusValidator::GetValid(resultSchema->GetColumnLoader(rec.ColumnId)->Apply(it->second)); auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext); - blob = columnSaver.Apply(*rb); + blob = columnSaver.Apply(rb); } + Y_VERIFY(rb->num_columns() == 1); if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { return false; } if (portionInfoWithBlobs.GetBlobs().empty() || portionInfoWithBlobs.GetBlobs().back().GetSize() + blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { - portionInfoWithBlobs.StartBlob(0).AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, 0, blob)); + portionInfoWithBlobs.StartBlob(0).AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, blob, rb->column(0)), blobSchema->GetIndexInfo()); } else { - portionInfoWithBlobs.GetBlobs().back().AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, 0, blob)); + portionInfoWithBlobs.GetBlobs().back().AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, blob, rb->column(0)), blobSchema->GetIndexInfo()); } } @@ -230,7 +231,7 @@ NKikimr::TConclusionStatus TTTLColumnEngineChanges::DoConstructBlobs(TConstructi for (auto& [portionInfo, evictFeatures] : PortionsToEvict) { if (UpdateEvictedPortion(portionInfo, evictFeatures, Blobs, EvictedRecords, context)) { - Y_VERIFY(portionInfo.GetPortionInfo().TierName == evictFeatures.TargetTierName); + Y_VERIFY(portionInfo.GetPortionInfo().GetMeta().GetTierName() == evictFeatures.TargetTierName); evicted.emplace_back(std::move(portionInfo), evictFeatures); } } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index a31e93a0bf..71a61f013d 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -18,7 +18,7 @@ void TChangesWithAppend::DoDebugString(TStringOutput& out) const { void TChangesWithAppend::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& /*context*/) { for (auto& portionInfo : AppendedPortions) { - switch (portionInfo.GetPortionInfo().Meta.Produced) { + switch (portionInfo.GetPortionInfo().GetMeta().Produced) { case NOlap::TPortionMeta::EProduced::UNSPECIFIED: Y_VERIFY(false); // unexpected case NOlap::TPortionMeta::EProduced::INSERTED: @@ -113,7 +113,7 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions( auto& blobInfo = infoWithBlob.StartBlob(blob.size()); for (auto&& chunk : blob) { const TString data = chunk.GetData(); - srcBlobs.emplace(blobInfo.AddChunk(infoWithBlob, std::move(chunk)).BlobRange, data); + srcBlobs.emplace(blobInfo.AddChunk(infoWithBlob, std::move(chunk), resultSchema->GetIndexInfo()).BlobRange, data); } } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portion_appended", infoWithBlob.GetPortionInfo().DebugString()); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 8f43be3fc9..aad865264e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -7,6 +7,7 @@ #include <ydb/core/formats/arrow/merging_sorted_input_stream.h> #include <ydb/core/tx/tiering/manager.h> #include <ydb/core/tx/columnshard/columnshard_ttl.h> +#include <ydb/core/tx/columnshard/columnshard_schema.h> #include <ydb/library/conclusion/status.h> #include "changes/indexation.h" #include "changes/in_granule_compaction.h" @@ -98,12 +99,10 @@ TColumnEngineStats::TPortionsStats DeltaStats(const TPortionInfo& portionInfo, u TColumnEngineStats::TPortionsStats deltaStats; THashSet<TUnifiedBlobId> blobs; for (auto& rec : portionInfo.Records) { - metadataBytes += rec.Metadata.size(); + metadataBytes += rec.GetMeta().GetMetadataSize(); blobs.insert(rec.BlobRange.BlobId); deltaStats.BytesByColumn[rec.ColumnId] += rec.BlobRange.Size; - } - for (auto& rec : portionInfo.Meta.ColumnMeta) { - deltaStats.RawBytesByColumn[rec.first] += rec.second.RawBytes; + deltaStats.RawBytesByColumn[rec.ColumnId] += rec.GetMeta().GetRawBytes(); } deltaStats.Rows = portionInfo.NumRows(); deltaStats.RawBytes = portionInfo.RawBytesSum(); @@ -123,16 +122,16 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c ui64 metadataBytes = 0; TColumnEngineStats::TPortionsStats deltaStats = DeltaStats(portionInfo, metadataBytes); - Y_VERIFY(!exPortionInfo || exPortionInfo->Meta.Produced != TPortionMeta::EProduced::UNSPECIFIED); - Y_VERIFY(portionInfo.Meta.Produced != TPortionMeta::EProduced::UNSPECIFIED); + Y_VERIFY(!exPortionInfo || exPortionInfo->GetMeta().Produced != TPortionMeta::EProduced::UNSPECIFIED); + Y_VERIFY(portionInfo.GetMeta().Produced != TPortionMeta::EProduced::UNSPECIFIED); TColumnEngineStats::TPortionsStats& srcStats = exPortionInfo ? (exPortionInfo->IsActive() - ? engineStats.StatsByType[exPortionInfo->Meta.Produced] + ? engineStats.StatsByType[exPortionInfo->GetMeta().Produced] : engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE]) - : engineStats.StatsByType[portionInfo.Meta.Produced]; + : engineStats.StatsByType[portionInfo.GetMeta().Produced]; TColumnEngineStats::TPortionsStats& stats = portionInfo.IsActive() - ? engineStats.StatsByType[portionInfo.Meta.Produced] + ? engineStats.StatsByType[portionInfo.GetMeta().Produced] : engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE]; const bool isErase = updateType == EStatsUpdateType::ERASE; @@ -236,16 +235,19 @@ bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) { } bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs) { - return ColumnsTable->Load(db, [&](const TPortionInfo& portion, const TColumnRecord& rec) { - auto& indexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo(); + TSnapshot lastSnapshot(0, 0); + const TIndexInfo* currentIndexInfo = nullptr; + return ColumnsTable->Load(db, [&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) { + if (!currentIndexInfo || lastSnapshot != portion.GetMinSnapshot()) { + currentIndexInfo = &VersionedIndex.GetSchema(portion.GetMinSnapshot())->GetIndexInfo(); + lastSnapshot = portion.GetMinSnapshot(); + } Y_VERIFY(portion.ValidSnapshotInfo()); - Y_VERIFY(rec.Valid()); // Do not count the blob as lost since it exists in the index. - lostBlobs.erase(rec.BlobRange.BlobId); + lostBlobs.erase(loadContext.GetBlobRange().BlobId); // Locate granule and append the record. - const auto gi = Granules.find(portion.GetGranule()); - Y_VERIFY(gi != Granules.end()); - gi->second->AddColumnRecord(indexInfo, portion, rec); + TColumnRecord rec(loadContext, *currentIndexInfo); + GetGranulePtrVerified(portion.GetGranule())->AddColumnRecord(*currentIndexInfo, portion, rec, loadContext.GetPortionMeta()); }); } @@ -327,6 +329,7 @@ std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompacti std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, ui32 maxRecords) noexcept { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size()); auto changes = TChangesConstructor::BuildCleanupChanges(snapshot); ui32 affectedRecords = 0; @@ -364,6 +367,7 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( } if (affectedRecords > maxRecords) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size()); return changes; } @@ -392,10 +396,12 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( break; } } else { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup/Check")("snapshot", snapshot.DebugString())("portions_snapshot", portionInfo->GetRemoveSnapshot().DebugString()); Y_VERIFY(portionInfo->CheckForCleanup()); ++it; } } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size()); return changes; } @@ -446,13 +452,13 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering const TInstant maxTtlPortionInstant = *mpiOpt; const TDuration d = maxTtlPortionInstant - expireTimestamp; keep = !!d; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestamp.Seconds()); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestamp.Seconds()); if (d && dWaiting > d) { dWaiting = d; } } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop); if (keep && tryEvictPortion) { TString tierName; for (auto& tierRef : ttl.GetOrderedTiers()) { @@ -477,8 +483,8 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering break; } } - if (info.TierName != tierName) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", info.TierName)("to", tierName); + if (info.GetMeta().GetTierName() != tierName) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", info.GetMeta().GetTierName())("to", tierName); evictionSize += info.BlobsSizes().first; const bool needExport = ttl.NeedExport(tierName); context.Changes->AddPortionToEvict(info, TPortionEvictionFeatures(tierName, pathId, needExport)); @@ -678,7 +684,7 @@ static TMap<TSnapshot, std::vector<const TPortionInfo*>> GroupPortionsBySnapshot if (visible) { out[recSnapshot].push_back(&portionInfo); } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "GroupPortionsBySnapshot")("analyze_portion", portionInfo.DebugString())("visible", visible)("snapshot", snapshot.DebugString()); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "GroupPortionsBySnapshot")("analyze_portion", portionInfo.DebugString())("visible", visible)("snapshot", snapshot.DebugString()); } return out; } diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h index 42f2eb87db..517a90c8a4 100644 --- a/ydb/core/tx/columnshard/engines/columns_table.h +++ b/ydb/core/tx/columnshard/engines/columns_table.h @@ -21,7 +21,7 @@ public: db.EraseColumn(IndexId, portion, row); } - bool Load(IDbWrapper& db, std::function<void(const TPortionInfo&, const TColumnRecord&)> callback) { + bool Load(IDbWrapper& db, std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)> callback) { return db.LoadColumns(IndexId, callback); } diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index 5aba26b580..4bef7aa140 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -65,7 +65,7 @@ void TDbWrapper::EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, con NColumnShard::Schema::IndexColumns_Erase(db, index, portion, row); } -bool TDbWrapper::LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnRecord&)>& callback) { +bool TDbWrapper::LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) { NIceDb::TNiceDb db(Database); return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, index, callback); } diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index 9a1572590a..876a2d4c87 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -7,6 +7,7 @@ class TDatabase; namespace NKikimr::NOlap { +class TColumnChunkLoadContext; struct TInsertedData; class TInsertTableAccessor; struct TColumnRecord; @@ -34,7 +35,7 @@ public: virtual void WriteColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0; virtual void EraseColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0; - virtual bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnRecord&)>& callback) = 0; + virtual bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) = 0; virtual void WriteCounter(ui32 index, ui32 counterId, ui64 value) = 0; virtual bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) = 0; @@ -63,7 +64,7 @@ public: void WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; void EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; - bool LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnRecord&)>& callback) override; + bool LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override; void WriteCounter(ui32 index, ui32 counterId, ui64 value) override; bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) override; diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt index 6db0e70f31..b8559cce9c 100644 --- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt @@ -27,6 +27,7 @@ target_sources(columnshard-engines-portions PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/common.cpp ) generate_enum_serilization(columnshard-engines-portions ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt index 7c602514cd..679d414a1e 100644 --- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt @@ -28,6 +28,7 @@ target_sources(columnshard-engines-portions PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/common.cpp ) generate_enum_serilization(columnshard-engines-portions ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt index 7c602514cd..679d414a1e 100644 --- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt @@ -28,6 +28,7 @@ target_sources(columnshard-engines-portions PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/common.cpp ) generate_enum_serilization(columnshard-engines-portions ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt index 6db0e70f31..b8559cce9c 100644 --- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt @@ -27,6 +27,7 @@ target_sources(columnshard-engines-portions PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/common.cpp ) generate_enum_serilization(columnshard-engines-portions ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.cpp b/ydb/core/tx/columnshard/engines/portions/column_record.cpp index 621928056a..8b69b6447d 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.cpp +++ b/ydb/core/tx/columnshard/engines/portions/column_record.cpp @@ -1,5 +1,73 @@ #include "column_record.h" +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/tx/columnshard/common/scalars.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/tx/columnshard/columnshard_schema.h> namespace NKikimr::NOlap { +TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context, const TIndexInfo& indexInfo) { + auto field = indexInfo.ArrowColumnField(context.GetAddress().GetColumnId()); + if (context.GetMetaProto().HasNumRows()) { + NumRows = context.GetMetaProto().GetNumRows(); + } + if (context.GetMetaProto().HasRawBytes()) { + RawBytes = context.GetMetaProto().GetRawBytes(); + } + if (context.GetMetaProto().HasMinValue()) { + Min = ConstantToScalar(context.GetMetaProto().GetMinValue(), field->type()); + } + if (context.GetMetaProto().HasMaxValue()) { + Max = ConstantToScalar(context.GetMetaProto().GetMaxValue(), field->type()); + } +} + +TChunkMeta::TChunkMeta(const std::shared_ptr<arrow::Array>& column, const ui32 columnId, const TIndexInfo& indexInfo) { + Y_VERIFY(column); + Y_VERIFY(column->length()); + NumRows = column->length(); + RawBytes = NArrow::GetArrayDataSize(column); + + if (indexInfo.GetMinMaxIdxColumns().contains(columnId)) { + std::pair<i32, i32> minMaxPos = {0, (column->length() - 1)}; + if (!indexInfo.IsSortedColumn(columnId)) { + minMaxPos = NArrow::FindMinMaxPosition(column); + Y_VERIFY(minMaxPos.first >= 0); + Y_VERIFY(minMaxPos.second >= 0); + } + + Min = NArrow::GetScalar(column, minMaxPos.first); + Max = NArrow::GetScalar(column, minMaxPos.second); + + Y_VERIFY(Min); + Y_VERIFY(Max); + } +} + +NKikimrTxColumnShard::TIndexColumnMeta TChunkMeta::SerializeToProto() const { + NKikimrTxColumnShard::TIndexColumnMeta meta; + meta.SetNumRows(NumRows); + meta.SetRawBytes(RawBytes); + if (HasMinMax()) { + ScalarToConstant(*Min, *meta.MutableMinValue()); + ScalarToConstant(*Max, *meta.MutableMaxValue()); + } + return meta; +} + +TColumnRecord::TColumnRecord(const TColumnChunkLoadContext& loadContext, const TIndexInfo& info) + : Meta(loadContext, info) + , ColumnId(loadContext.GetAddress().GetColumnId()) + , Chunk(loadContext.GetAddress().GetChunk()) + , BlobRange(loadContext.GetBlobRange()) +{ +} + +TColumnRecord::TColumnRecord(const TChunkAddress& address, const std::shared_ptr<arrow::Array>& column, const TIndexInfo& info) + : Meta(column, address.GetColumnId(), info) + , ColumnId(address.GetColumnId()) + , Chunk(address.GetChunk()) +{ +} + } diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h index b36e8516de..13ca916783 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.h +++ b/ydb/core/tx/columnshard/engines/portions/column_record.h @@ -1,22 +1,63 @@ #pragma once +#include "common.h" +#include <ydb/core/protos/tx_columnshard.pb.h> #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/common/snapshot.h> +#include <ydb/library/accessor/accessor.h> +#include <util/string/builder.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> namespace NKikimr::NOlap { +class TColumnChunkLoadContext; +struct TIndexInfo; + +struct TChunkMeta { +private: + YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, Min); + YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, Max); + YDB_READONLY(ui32, NumRows, 0); + YDB_READONLY(ui32, RawBytes, 0); +public: + ui64 GetMetadataSize() const { + return sizeof(NumRows) + sizeof(RawBytes) + 8 * 3 * 2; + } + + bool HasMinMax() const noexcept { + return Min.get() && Max.get(); + } + + NKikimrTxColumnShard::TIndexColumnMeta SerializeToProto() const; + + TChunkMeta(const TColumnChunkLoadContext& context, const TIndexInfo& indexInfo); + + TChunkMeta(const std::shared_ptr<arrow::Array>& column, const ui32 columnId, const TIndexInfo& indexInfo); +}; struct TColumnRecord { +private: + TChunkMeta Meta; +public: ui32 ColumnId = 0; - ui16 Chunk; // Number of blob for column ColumnName in Portion + ui16 Chunk = 0; TBlobRange BlobRange; - TString Metadata; + + const TChunkMeta& GetMeta() const { + return Meta; + } + + TChunkAddress GetAddress() const { + return TChunkAddress(ColumnId, Chunk); + } bool IsEqualTest(const TColumnRecord& item) const { return ColumnId == item.ColumnId && Chunk == item.Chunk; } std::optional<ui32> GetChunkRowsCount() const { - return {}; + return Meta.GetNumRows(); } bool Valid() const { @@ -38,12 +79,9 @@ struct TColumnRecord { return BlobRange.BlobId.IsValid() && BlobRange.Size; } - static TColumnRecord Make(ui32 columnId, ui16 chunk = 0) { - TColumnRecord row; - row.ColumnId = columnId; - row.Chunk = chunk; - return row; - } + TColumnRecord(const TChunkAddress& address, const std::shared_ptr<arrow::Array>& column, const TIndexInfo& info); + + TColumnRecord(const TColumnChunkLoadContext& loadContext, const TIndexInfo& info); friend IOutputStream& operator << (IOutputStream& out, const TColumnRecord& rec) { out << '{'; diff --git a/ydb/core/tx/columnshard/engines/portions/common.cpp b/ydb/core/tx/columnshard/engines/portions/common.cpp new file mode 100644 index 0000000000..e18ca98033 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/common.cpp @@ -0,0 +1,10 @@ +#include "common.h" +#include <util/string/builder.h> + +namespace NKikimr::NOlap { + +TString TChunkAddress::DebugString() const { + return TStringBuilder() << "(column_id=" << ColumnId << ";chunk=" << Chunk << ";)"; +} + +} diff --git a/ydb/core/tx/columnshard/engines/portions/common.h b/ydb/core/tx/columnshard/engines/portions/common.h new file mode 100644 index 0000000000..b0305fa53e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/common.h @@ -0,0 +1,29 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NOlap { + +class TChunkAddress { +private: + YDB_READONLY(ui32, ColumnId, 0); + YDB_READONLY(ui16, Chunk, 0); +public: + TChunkAddress(const ui32 columnId, const ui16 chunk) + : ColumnId(columnId) + , Chunk(chunk) { + + } + + bool operator<(const TChunkAddress& address) const { + return std::tie(ColumnId, Chunk) < std::tie(address.ColumnId, address.Chunk); + } + + bool operator==(const TChunkAddress& address) const { + return std::tie(ColumnId, Chunk) == std::tie(address.ColumnId, address.Chunk); + } + + TString DebugString() const; +}; + + +} diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp index ac2c17ad66..2aa1efd2ec 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp @@ -1,5 +1,112 @@ #include "meta.h" +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/formats/arrow/arrow_filter.h> +#include <library/cpp/actors/core/log.h> namespace NKikimr::NOlap { +void TPortionMeta::FillBatchInfo(const std::shared_ptr<arrow::RecordBatch> batch, const TIndexInfo& indexInfo) { + { + auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); + std::vector<bool> bits(batch->num_rows(), false); + bits[0] = true; + bits[batch->num_rows() - 1] = true; // it could be 0 if batch has one row + + auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows()); + auto res = arrow::compute::Filter(keyBatch, filter); + Y_VERIFY(res.ok()); + + ReplaceKeyEdges = res->record_batch(); + Y_VERIFY(ReplaceKeyEdges->num_rows() == 1 || ReplaceKeyEdges->num_rows() == 2); + } + + auto edgesBatch = NArrow::ExtractColumns(ReplaceKeyEdges, indexInfo.GetIndexKey()); + IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0); + IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1); +} + +bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) { + const bool compositeIndexKey = indexInfo.IsCompositeIndexKey(); + if (Produced != TPortionMeta::EProduced::UNSPECIFIED) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication"); + return false; + } + TierName = portionMeta.GetTierName(); + if (portionMeta.GetIsInserted()) { + Produced = TPortionMeta::EProduced::INSERTED; + } else if (portionMeta.GetIsCompacted()) { + Produced = TPortionMeta::EProduced::COMPACTED; + } else if (portionMeta.GetIsSplitCompacted()) { + Produced = TPortionMeta::EProduced::SPLIT_COMPACTED; + } else if (portionMeta.GetIsEvicted()) { + Produced = TPortionMeta::EProduced::EVICTED; + } else { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "incorrect portion meta")("meta", portionMeta.DebugString()); + return false; + } + + if (portionMeta.HasPrimaryKeyBorders()) { + ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey()); + Y_VERIFY(ReplaceKeyEdges); + Y_VERIFY_DEBUG(ReplaceKeyEdges->ValidateFull().ok()); + Y_VERIFY(ReplaceKeyEdges->num_rows() == 1 || ReplaceKeyEdges->num_rows() == 2); + + if (compositeIndexKey) { + auto edgesBatch = NArrow::ExtractColumns(ReplaceKeyEdges, indexInfo.GetIndexKey()); + Y_VERIFY(edgesBatch); + IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0); + IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1); + } + } + return true; +} + +std::optional<NKikimrTxColumnShard::TIndexPortionMeta> TPortionMeta::SerializeToProto(const ui32 columnId, const ui32 chunk) const { + if (columnId != FirstPkColumn || chunk != 0) { + return {}; + } + + NKikimrTxColumnShard::TIndexPortionMeta portionMeta; + portionMeta.SetTierName(TierName); + + switch (Produced) { + case TPortionMeta::EProduced::UNSPECIFIED: + Y_VERIFY(false); + case TPortionMeta::EProduced::INSERTED: + portionMeta.SetIsInserted(true); + break; + case TPortionMeta::EProduced::COMPACTED: + portionMeta.SetIsCompacted(true); + break; + case TPortionMeta::EProduced::SPLIT_COMPACTED: + portionMeta.SetIsSplitCompacted(true); + break; + case TPortionMeta::EProduced::EVICTED: + portionMeta.SetIsEvicted(true); + break; + case TPortionMeta::EProduced::INACTIVE: + Y_FAIL("Unexpected inactive case"); + //portionMeta->SetInactive(true); + break; + } + + if (const auto& keyEdgesBatch = ReplaceKeyEdges) { + Y_VERIFY(keyEdgesBatch); + Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok()); + Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2); + portionMeta.SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch)); + } + return portionMeta; +} + +TString TPortionMeta::DebugString() const { + TStringBuilder sb; + sb << "(produced=" << Produced << ";"; + if (TierName) { + sb << "tier_name=" << TierName << ";"; + } + sb << ")"; + return sb; +} + } diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h index 8095be85ca..8ab306a8ab 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/portions/meta.h @@ -1,68 +1,40 @@ #pragma once #include <ydb/core/tx/columnshard/common/portion.h> #include <ydb/core/formats/arrow/replace_key.h> +#include <ydb/core/protos/tx_columnshard.pb.h> #include <util/stream/output.h> namespace NKikimr::NOlap { +struct TIndexInfo; + struct TPortionMeta { +private: + void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted); + std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows + YDB_ACCESSOR_DEF(TString, TierName); +public: using EProduced = NPortion::EProduced; - struct TColumnMeta { - ui32 NumRows{0}; - ui32 RawBytes{0}; - std::shared_ptr<arrow::Scalar> Min; - std::shared_ptr<arrow::Scalar> Max; - - bool HasMinMax() const noexcept { - return Min.get() && Max.get(); - } - }; - - EProduced GetProduced() const { - return Produced; - } - - EProduced Produced{EProduced::UNSPECIFIED}; - THashMap<ui32, TColumnMeta> ColumnMeta; - ui32 FirstPkColumn = 0; - std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows std::optional<NArrow::TReplaceKey> IndexKeyStart; std::optional<NArrow::TReplaceKey> IndexKeyEnd; + EProduced Produced{EProduced::UNSPECIFIED}; + ui32 FirstPkColumn = 0; - TString DebugString() const { - return TStringBuilder() << - "produced:" << Produced << ";" - ; - } + bool DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo); + + std::optional<NKikimrTxColumnShard::TIndexPortionMeta> SerializeToProto(const ui32 columnId, const ui32 chunk) const; - bool HasMinMax(ui32 columnId) const { - if (!ColumnMeta.contains(columnId)) { - return false; - } - return ColumnMeta.find(columnId)->second.HasMinMax(); - } + void FillBatchInfo(const std::shared_ptr<arrow::RecordBatch> batch, const TIndexInfo& indexInfo); - bool HasPkMinMax() const { - return HasMinMax(FirstPkColumn); + EProduced GetProduced() const { + return Produced; } - ui32 NumRows() const { - if (FirstPkColumn) { - Y_VERIFY(ColumnMeta.contains(FirstPkColumn)); - return ColumnMeta.find(FirstPkColumn)->second.NumRows; - } - return 0; - } + TString DebugString() const; friend IOutputStream& operator << (IOutputStream& out, const TPortionMeta& info) { - out << "reason" << (ui32)info.Produced; - for (const auto& [_, meta] : info.ColumnMeta) { - if (meta.NumRows) { - out << " " << meta.NumRows << " rows"; - break; - } - } + out << info.DebugString(); return out; } }; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 1f152049fc..df3abc327e 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -18,221 +18,52 @@ const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) } } if (maxChunk) { - Y_VERIFY(*maxChunk + 1 == record.Chunk); + AFL_VERIFY(*maxChunk + 1 == record.Chunk)("max", *maxChunk)("record", record.Chunk); } else { - Y_VERIFY(0 == record.Chunk); + AFL_VERIFY(0 == record.Chunk)("record", record.Chunk); } Records.emplace_back(std::move(record)); return Records.back(); } -void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) { - Y_VERIFY(column->length()); - - std::pair<int, int> minMaxPos = {0, (column->length() - 1)}; - if (!sorted) { - minMaxPos = NArrow::FindMinMaxPosition(column); - } - - Y_VERIFY(minMaxPos.first >= 0); - Y_VERIFY(minMaxPos.second >= 0); - - Meta.ColumnMeta[columnId].Min = NArrow::GetScalar(column, minMaxPos.first); - Meta.ColumnMeta[columnId].Max = NArrow::GetScalar(column, minMaxPos.second); -} - void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName) { const auto& indexInfo = snapshotSchema.GetIndexInfo(); - const auto& minMaxColumns = indexInfo.GetMinMaxIdxColumns(); - TierName = tierName; Meta = {}; Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); - - // Copy first and last key rows into new batch to free source batch's memory - { - auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); - std::vector<bool> bits(batch->num_rows(), false); - bits[0] = true; - bits[batch->num_rows() - 1] = true; // it colud be 0 if batch has one row - - auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows()); - auto res = arrow::compute::Filter(keyBatch, filter); - Y_VERIFY(res.ok()); - - Meta.ReplaceKeyEdges = res->record_batch(); - Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2); - } - - auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey()); - Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0); - Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1); - - /// @note It does not add RawBytes info for snapshot columns, only for user ones. - for (auto& [columnId, col] : indexInfo.Columns) { - const auto& columnName = col.Name; - auto column = batch->GetColumnByName(col.Name); - Y_VERIFY(column); - Meta.ColumnMeta[columnId].NumRows = column->length(); - Meta.ColumnMeta[columnId].RawBytes = NArrow::GetArrayDataSize(column); - - if (minMaxColumns.contains(columnId)) { - auto column = batch->GetColumnByName(columnName); - Y_VERIFY(column); - - const bool isSorted = (columnId == Meta.FirstPkColumn); - AddMinMax(columnId, column, isSorted); - Y_VERIFY(Meta.HasMinMax(columnId)); - } - } + Meta.FillBatchInfo(batch, indexInfo); + Meta.SetTierName(tierName); } -TString TPortionInfo::GetMetadata(const TColumnRecord& rec) const { - NKikimrTxColumnShard::TIndexColumnMeta meta; // TODO: move proto serialization out of engines folder - if (Meta.ColumnMeta.contains(rec.ColumnId)) { - const auto& columnMeta = Meta.ColumnMeta.find(rec.ColumnId)->second; - if (auto numRows = columnMeta.NumRows) { - meta.SetNumRows(numRows); - } - if (auto rawBytes = columnMeta.RawBytes) { - meta.SetRawBytes(rawBytes); - } - if (columnMeta.HasMinMax()) { - ScalarToConstant(*columnMeta.Min, *meta.MutableMinValue()); - ScalarToConstant(*columnMeta.Max, *meta.MutableMaxValue()); - } - } - - if (rec.ColumnId == Meta.FirstPkColumn) { - auto* portionMeta = meta.MutablePortionMeta(); - - switch (Meta.Produced) { - case TPortionMeta::EProduced::UNSPECIFIED: - Y_VERIFY(false); - case TPortionMeta::EProduced::INSERTED: - portionMeta->SetIsInserted(true); - break; - case TPortionMeta::EProduced::COMPACTED: - portionMeta->SetIsCompacted(true); - break; - case TPortionMeta::EProduced::SPLIT_COMPACTED: - portionMeta->SetIsSplitCompacted(true); - break; - case TPortionMeta::EProduced::EVICTED: - portionMeta->SetIsEvicted(true); - break; - case TPortionMeta::EProduced::INACTIVE: - Y_FAIL("Unexpected inactive case"); - //portionMeta->SetInactive(true); - break; - } - - if (!TierName.empty()) { - portionMeta->SetTierName(TierName); - } - - if (const auto& keyEdgesBatch = Meta.ReplaceKeyEdges) { - Y_VERIFY(keyEdgesBatch); - Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok()); - Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2); - portionMeta->SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch)); - } - } - - TString out; - Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&out); - return out; -} - -void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec) { - if (rec.Metadata.empty()) { - return; - } - - NKikimrTxColumnShard::TIndexColumnMeta meta; - bool ok = meta.ParseFromString(rec.Metadata); - Y_VERIFY(ok); - - Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); - auto field = indexInfo.ArrowColumnField(rec.ColumnId); - const bool compositeIndexKey = indexInfo.IsCompositeIndexKey(); - - if (meta.HasPortionMeta()) { - Y_VERIFY_DEBUG(rec.ColumnId == Meta.FirstPkColumn); - - auto& portionMeta = meta.GetPortionMeta(); - TierName = portionMeta.GetTierName(); - - if (portionMeta.GetIsInserted()) { - Meta.Produced = TPortionMeta::EProduced::INSERTED; - } else if (portionMeta.GetIsCompacted()) { - Meta.Produced = TPortionMeta::EProduced::COMPACTED; - } else if (portionMeta.GetIsSplitCompacted()) { - Meta.Produced = TPortionMeta::EProduced::SPLIT_COMPACTED; - } else if (portionMeta.GetIsEvicted()) { - Meta.Produced = TPortionMeta::EProduced::EVICTED; - } - - if (portionMeta.HasPrimaryKeyBorders()) { - Meta.ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey()); - Y_VERIFY(Meta.ReplaceKeyEdges); - Y_VERIFY_DEBUG(Meta.ReplaceKeyEdges->ValidateFull().ok()); - Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2); - - if (compositeIndexKey) { - auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey()); - Y_VERIFY(edgesBatch); - Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0); - Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1); +std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const { + std::shared_ptr<arrow::Scalar> result; + for (auto&& i : Records) { + if (i.ColumnId == columnId) { + if (!i.GetMeta().GetMin()) { + return nullptr; + } + if (!result || NArrow::ScalarCompare(result, i.GetMeta().GetMin()) > 0) { + result = i.GetMeta().GetMin(); } } } - if (meta.HasNumRows()) { - Meta.ColumnMeta[rec.ColumnId].NumRows = meta.GetNumRows(); - } - if (meta.HasRawBytes()) { - Meta.ColumnMeta[rec.ColumnId].RawBytes = meta.GetRawBytes(); - } - if (meta.HasMinValue()) { - auto scalar = ConstantToScalar(meta.GetMinValue(), field->type()); - Meta.ColumnMeta[rec.ColumnId].Min = scalar; - - // Restore Meta.IndexKeyStart for one column IndexKey - if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) { - Meta.IndexKeyStart = NArrow::TReplaceKey::FromScalar(scalar); - } - } - if (meta.HasMaxValue()) { - auto scalar = ConstantToScalar(meta.GetMaxValue(), field->type()); - Meta.ColumnMeta[rec.ColumnId].Max = scalar; - - // Restore Meta.IndexKeyEnd for one column IndexKey - if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) { - Meta.IndexKeyEnd = NArrow::TReplaceKey::FromScalar(scalar); - } - } - - // Portion genarated without PrimaryKeyBorders and loaded with indexInfo.IsCompositeIndexKey() - // We should have no such portions for ForceColumnTablesCompositeMarks feature - if (rec.ColumnId == Meta.FirstPkColumn) { - Y_VERIFY(Meta.IndexKeyStart && Meta.IndexKeyEnd); - } -} - -std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const { - if (!Meta.ColumnMeta.contains(columnId)) { - return {}; - } - return Meta.ColumnMeta.find(columnId)->second.Min; + return result; } std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const { - auto it = Meta.ColumnMeta.find(columnId); - if (it == Meta.ColumnMeta.end()) { - return {}; + std::shared_ptr<arrow::Scalar> result; + for (auto&& i : Records) { + if (i.ColumnId == columnId) { + if (!i.GetMeta().GetMax()) { + return nullptr; + } + if (!result || NArrow::ScalarCompare(result, i.GetMeta().GetMax()) < 0) { + result = i.GetMeta().GetMax(); + } + } } - return it->second.Max; + return result; } TPortionInfo TPortionInfo::CopyWithFilteredColumns(const THashSet<ui32>& columnIds) const { @@ -256,9 +87,10 @@ ui64 TPortionInfo::GetRawBytes(const std::vector<ui32>& columnIds) const { if (TIndexInfo::IsSpecialColumn(i)) { sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i); } else { - auto it = Meta.ColumnMeta.find(i); - if (it != Meta.ColumnMeta.end()) { - sum += it->second.RawBytes; + for (auto&& r : Records) { + if (r.ColumnId == i) { + sum += r.GetMeta().GetRawBytes(); + } } } } @@ -283,14 +115,47 @@ TString TPortionInfo::DebugString() const { if (RemoveSnapshot.Valid()) { sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");"; } - sb << "meta:(" << Meta << ");"; + sb << "meta:(" << Meta.DebugString() << ");"; sb << "chunks:(" << Records.size() << ");"; - if (TierName) { - sb << "tier:" << TierName << ";"; - } return sb << ")"; } +void TPortionInfo::AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta) { + Records.push_back(rec); + + if (portionMeta) { + Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); + Y_VERIFY(Meta.DeserializeFromProto(*portionMeta, indexInfo)); + } + if (!indexInfo.IsCompositeIndexKey() && indexInfo.GetPKFirstColumnId() == rec.ColumnId) { + if (rec.GetMeta().GetMin()) { + auto candidate = NArrow::TReplaceKey::FromScalar(rec.GetMeta().GetMin()); + if (!Meta.IndexKeyStart || candidate < *Meta.IndexKeyStart) { + Meta.IndexKeyStart = candidate; + } + } + if (rec.GetMeta().GetMax()) { + auto candidate = NArrow::TReplaceKey::FromScalar(rec.GetMeta().GetMax()); + if (!Meta.IndexKeyEnd || *Meta.IndexKeyEnd < candidate) { + Meta.IndexKeyEnd = candidate; + } + } + } +} + +bool TPortionInfo::HasPkMinMax() const { + bool result = false; + for (auto&& i : Records) { + if (i.ColumnId == Meta.FirstPkColumn) { + if (!i.GetMeta().HasMinMax()) { + return false; + } + result = true; + } + } + return result; +} + std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { Y_VERIFY(!Blobs.empty()); @@ -308,7 +173,7 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() c std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; - std::vector< std::shared_ptr<arrow::Field>> fields; + std::vector<std::shared_ptr<arrow::Field>> fields; for (auto&& i : Columns) { if (!options.IsAcceptedColumn(i.GetColumnId())) { continue; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 455e102497..267e0d4570 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -17,16 +17,38 @@ private: ui64 Portion = 0; // Id of independent (overlayed by PK) portion of data in granule TSnapshot MinSnapshot = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion} TSnapshot RemoveSnapshot = TSnapshot::Zero(); // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one) + + bool HasPkMinMax() const; + TPortionMeta Meta; public: static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024; + void ResetMeta() { + Meta = TPortionMeta(); + } + + const TPortionMeta& GetMeta() const { + return Meta; + } + + TPortionMeta& MutableMeta() { + return Meta; + } + std::vector<TColumnRecord> Records; - TPortionMeta Meta; - TString TierName; + + const TColumnRecord* GetRecordPointer(const TChunkAddress& address) const { + for (auto&& i : Records) { + if (i.GetAddress() == address) { + return &i; + } + } + return nullptr; + } bool Empty() const { return Records.empty(); } bool Produced() const { return Meta.GetProduced() != TPortionMeta::EProduced::UNSPECIFIED; } - bool Valid() const { return MinSnapshot.Valid() && Granule && Portion && !Empty() && Produced() && Meta.HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; } + bool Valid() const { return MinSnapshot.Valid() && Granule && Portion && !Empty() && Produced() && HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; } bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && Granule && Portion; } bool IsInserted() const { return Meta.GetProduced() == TPortionMeta::EProduced::INSERTED; } bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; } @@ -50,7 +72,6 @@ public: , Portion(portionId) , MinSnapshot(minSnapshot) { - } TString DebugString() const; @@ -153,21 +174,12 @@ public: void UpdateRecordsMeta(TPortionMeta::EProduced produced) { Meta.Produced = produced; - for (auto& record : Records) { - record.Metadata = GetMetadata(record); - } } - void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) { - Records.push_back(rec); - LoadMetadata(indexInfo, rec); - } + void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta); - TString GetMetadata(const TColumnRecord& rec) const; - void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec); void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName); - void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted); std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const; std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const; @@ -183,17 +195,35 @@ public: } ui32 NumRows() const { - return Meta.NumRows(); + ui32 result = 0; + std::optional<ui32> columnIdFirst; + for (auto&& i : Records) { + if (!columnIdFirst || *columnIdFirst == i.ColumnId) { + result += i.GetMeta().GetNumRows(); + columnIdFirst = i.ColumnId; + } + } + return result; + } + + ui32 NumRows(const ui32 columnId) const { + ui32 result = 0; + for (auto&& i : Records) { + if (columnId == i.ColumnId) { + result += i.GetMeta().GetNumRows(); + } + } + return result; } ui64 GetRawBytes(const std::vector<ui32>& columnIds) const; ui64 RawBytesSum() const { - ui64 sum = 0; - for (auto& [columnId, colMeta] : Meta.ColumnMeta) { - sum += colMeta.RawBytes; + ui64 result = 0; + for (auto&& i : Records) { + result += i.GetMeta().GetRawBytes(); } - return sum; + return result; } private: @@ -354,8 +384,7 @@ public: std::vector<TPreparedColumn> columns; columns.reserve(resultSchema.GetSchema()->num_fields()); - Y_VERIFY(!Meta.ColumnMeta.empty()); - const ui32 rowsCount = Meta.ColumnMeta.begin()->second.NumRows; + const ui32 rowsCount = NumRows(); for (auto&& field : resultSchema.GetSchema()->fields()) { columns.emplace_back(TPreparedColumn({ TAssembleBlobInfo(rowsCount) }, resultSchema.GetColumnLoader(field->name()))); } @@ -372,10 +401,7 @@ public: Y_ASSERT(pos >= 0); positionsMap[resulPos] = pos; Y_VERIFY(columnChunks[resulPos].emplace(rec.Chunk, rec.BlobRange).second); - auto columnMeta = Meta.ColumnMeta.FindPtr(rec.ColumnId); - if (columnMeta) { - Y_VERIFY_S(rowsCount == columnMeta->NumRows, TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << columnMeta->NumRows); - } + Y_VERIFY_S(rowsCount == NumRows(rec.ColumnId), TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << NumRows(rec.ColumnId)); } // Make chunked arrays for columns diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp index cc07cafb38..56d228ce93 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp @@ -2,10 +2,10 @@ namespace NKikimr::NOlap { -const TColumnRecord& TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk) { +const TColumnRecord& TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk, const TIndexInfo& info) { Y_VERIFY(!ResultBlob); const ui32 columnId = chunk.GetColumnId(); - auto rec = TColumnRecord::Make(columnId, owner.ColumnChunkIds[columnId]++); + TColumnRecord rec(TChunkAddress(columnId, owner.ColumnChunkIds[columnId]++), chunk.GetColumn(), info); rec.BlobRange.Offset = Size; rec.BlobRange.Size = chunk.GetData().size(); auto& result = owner.PortionInfo.AppendOneChunkColumn(std::move(rec)); diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/with_blobs.h index 5dc70c0aac..80fb9cbb88 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.h @@ -33,7 +33,7 @@ public: return *ResultBlob; } - const TColumnRecord& AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk); + const TColumnRecord& AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk, const TIndexInfo& info); void RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId); }; diff --git a/ydb/core/tx/columnshard/engines/portions/ya.make b/ydb/core/tx/columnshard/engines/portions/ya.make index 394c17d6ad..2af9b511fd 100644 --- a/ydb/core/tx/columnshard/engines/portions/ya.make +++ b/ydb/core/tx/columnshard/engines/portions/ya.make @@ -5,6 +5,7 @@ SRCS( column_record.cpp with_blobs.cpp meta.cpp + common.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 7ad41d34bb..20f0e21a7e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -167,6 +167,7 @@ public: /// Returns whether the sorting keys defined. bool IsSorted() const { return SortingKey.get(); } + bool IsSortedColumn(const ui32 columnId) const { return GetPKFirstColumnId() == columnId; } /// Returns whether the replace keys defined. bool IsReplacing() const { return ReplaceKey.get(); } diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index 7cc5d05761..394c536013 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -28,7 +28,7 @@ ui64 TGranuleMeta::Size() const { } void TGranuleMeta::UpsertPortion(const TPortionInfo& info) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info.DebugString())("granule", GetGranuleId()); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info.DebugString())("granule", GetGranuleId()); auto it = Portions.find(info.GetPortion()); AFL_VERIFY(info.GetGranule() == GetGranuleId())("event", "incompatible_granule")("portion", info.DebugString())("granule", GetGranuleId()); @@ -52,10 +52,10 @@ void TGranuleMeta::UpsertPortion(const TPortionInfo& info) { bool TGranuleMeta::ErasePortion(const ui64 portion) { auto it = Portions.find(portion); if (it == Portions.end()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased_already")("portion_id", portion)("pathId", Record.PathId); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased_already")("portion_id", portion)("pathId", Record.PathId); return false; } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second)("pathId", Record.PathId); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second)("pathId", Record.PathId); } OnBeforeChangePortion(&it->second, nullptr); Portions.erase(it); @@ -63,20 +63,20 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) { return true; } -void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec) { +void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta) { auto it = Portions.find(portion.GetPortion()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "add_column_record")("portion_info", portion.DebugString())("record", rec.DebugString()); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_column_record")("portion_info", portion.DebugString())("record", rec.DebugString()); if (it == Portions.end()) { Y_VERIFY(portion.Records.empty()); auto portionNew = portion; - portionNew.AddRecord(indexInfo, rec); + portionNew.AddRecord(indexInfo, rec, portionMeta); OnBeforeChangePortion(nullptr, &portionNew); Portions.emplace(portion.GetPortion(), std::move(portionNew)); OnAfterChangePortion(); } else { Y_VERIFY(it->second.IsEqualWithSnapshots(portion)); auto portionNew = it->second; - portionNew.AddRecord(indexInfo, rec); + portionNew.AddRecord(indexInfo, rec, portionMeta); OnBeforeChangePortion(&it->second, &portionNew); it->second = std::move(portionNew); OnAfterChangePortion(); @@ -96,7 +96,7 @@ void TGranuleMeta::OnBeforeChangePortion(const TPortionInfo* portionBefore, cons blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size; } for (auto&& i : blobIdSize) { - PortionInfoGuard.OnDropBlob(portionBefore->IsActive() ? portionBefore->Meta.Produced : NPortion::EProduced::INACTIVE, i.second); + PortionInfoGuard.OnDropBlob(portionBefore->IsActive() ? portionBefore->GetMeta().Produced : NPortion::EProduced::INACTIVE, i.second); } } if (portionAfter) { @@ -105,7 +105,7 @@ void TGranuleMeta::OnBeforeChangePortion(const TPortionInfo* portionBefore, cons blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size; } for (auto&& i : blobIdSize) { - PortionInfoGuard.OnNewBlob(portionAfter->IsActive() ? portionAfter->Meta.Produced : NPortion::EProduced::INACTIVE, i.second); + PortionInfoGuard.OnNewBlob(portionAfter->IsActive() ? portionAfter->GetMeta().Produced : NPortion::EProduced::INACTIVE, i.second); } } if (!!AdditiveSummaryCache) { diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 748453314e..2cb355e798 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -366,7 +366,7 @@ public: const TGranuleRecord Record; - void AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec); + void AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta); const THashMap<ui64, TPortionInfo>& GetPortions() const { return Portions; diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index c973a2cc9d..a4078db93b 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -1,8 +1,11 @@ -#include <library/cpp/testing/unittest/registar.h> -#include <util/string/printf.h> #include "db_wrapper.h" #include "insert_table/insert_table.h" +#include <ydb/core/tx/columnshard/columnshard_schema.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <util/string/printf.h> + namespace NKikimr { using namespace NOlap; @@ -30,7 +33,7 @@ public: void WriteColumn(ui32, const TPortionInfo&, const TColumnRecord&) override {} void EraseColumn(ui32, const TPortionInfo&, const TColumnRecord&) override {} - bool LoadColumns(ui32, const std::function<void(const TPortionInfo&, const TColumnRecord&)>&) override { return true; } + bool LoadColumns(ui32, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>&) override { return true; } void WriteCounter(ui32, ui32, ui64) override {} bool LoadCounters(ui32, const std::function<void(ui32 id, ui64 value)>&) override { return true; } diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 4bb0810c1f..fb4c91d3f8 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -15,6 +15,8 @@ using TTypeInfo = NScheme::TTypeInfo; namespace { class TTestDbWrapper : public IDbWrapper { +private: + std::map<TPortionAddress, std::map<TChunkAddress, TColumnChunkLoadContext>> LoadContexts; public: struct TIndex { THashMap<ui64, std::vector<TGranuleRecord>> Granules; // pathId -> granule @@ -103,7 +105,18 @@ public: } void WriteColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) override { + auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk); + auto rowProto = row.GetMeta().SerializeToProto(); + if (proto) { + *rowProto.MutablePortionMeta() = std::move(*proto); + } + auto& data = Indices[index].Columns[portion.GetGranule()]; + NOlap::TColumnChunkLoadContext loadContext(row.GetAddress(), row.BlobRange, rowProto); + auto itInsertInfo = LoadContexts[portion.GetAddress()].emplace(row.GetAddress(), loadContext); + if (!itInsertInfo.second) { + itInsertInfo.first->second = loadContext; + } auto it = data.find(portion.GetPortion()); if (it == data.end()) { it = data.emplace(portion.GetPortion(), portion.CopyWithFilteredColumns({})).first; @@ -141,14 +154,18 @@ public: portionLocal.Records.swap(filtered); } - bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnRecord&)>& callback) override { + bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override { auto& columns = Indices[index].Columns; for (auto& [granule, portions] : columns) { for (auto& [portionId, portionLocal] : portions) { auto copy = portionLocal; + copy.ResetMeta(); copy.Records.clear(); for (const auto& rec : portionLocal.Records) { - callback(copy, rec); + auto itContextLoader = LoadContexts[copy.GetAddress()].find(rec.GetAddress()); + Y_VERIFY(itContextLoader != LoadContexts[copy.GetAddress()].end()); + callback(copy, itContextLoader->second); + LoadContexts[copy.GetAddress()].erase(itContextLoader); } } } diff --git a/ydb/core/tx/columnshard/splitter/chunks.cpp b/ydb/core/tx/columnshard/splitter/chunks.cpp index d5390412f8..fb6f7f6619 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.cpp +++ b/ydb/core/tx/columnshard/splitter/chunks.cpp @@ -1,4 +1,5 @@ #include "chunks.h" +#include <ydb/core/formats/arrow/arrow_helpers.h> namespace NKikimr::NOlap { @@ -13,4 +14,8 @@ std::vector<TSplittedColumnChunk> TSplittedColumnChunk::InternalSplit(const TCol return newChunks; } +TString TOrderedColumnChunk::DebugString() const { + return TStringBuilder() << "column_id=" << ColumnId << ";data_size=" << Data.size() << ";records_count=" << Column->length() << ";data=" << NArrow::DebugJson(Column, 3, 3) << ";"; +} + } diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index af5b28bacd..921e13229d 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -106,20 +106,27 @@ public: class TOrderedColumnChunk { private: YDB_READONLY(ui32, ColumnId, 0); - YDB_READONLY(ui32, RecordsCount, 0); YDB_READONLY_DEF(TString, Data); + std::shared_ptr<arrow::Array> Column; public: - TOrderedColumnChunk(const ui32 columnId, const ui32 recordsCount, const TString& data) + std::shared_ptr<arrow::Array> GetColumn() const { + return Column; + } + + ui32 GetRecordsCount() const { + return Column->length(); + } + + TOrderedColumnChunk(const ui32 columnId, const TString& data, std::shared_ptr<arrow::Array> column) : ColumnId(columnId) - , RecordsCount(recordsCount) , Data(std::move(data)) + , Column(column) { + Y_VERIFY(Column); } - TString DebugString() const { - return TStringBuilder() << "column_id=" << ColumnId << ";data_size=" << Data.size() << ";records_count=" << RecordsCount << ";"; - } + TString DebugString() const; }; } diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp index a5210a72f2..7892859ff4 100644 --- a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp @@ -91,7 +91,7 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<TOrderedColumnChunk>>& portio } std::vector<TOrderedColumnChunk> chunksForBlob; for (auto&& c : i.GetChunks()) { - chunksForBlob.emplace_back(c.GetColumnId(), c.GetData().GetRecordsCount(), c.GetData().GetSerializedChunk()); + chunksForBlob.emplace_back(c.GetColumnId(), c.GetData().GetSerializedChunk(), c.GetData().GetColumn()); } result.emplace_back(std::move(chunksForBlob)); } diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h index 54fba342be..4e41d4d6e7 100644 --- a/ydb/core/tx/columnshard/splitter/simple.h +++ b/ydb/core/tx/columnshard/splitter/simple.h @@ -11,6 +11,10 @@ private: YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, SlicedBatch); YDB_READONLY_DEF(TString, SerializedChunk); public: + std::shared_ptr<arrow::Array> GetColumn() const { + return SlicedBatch->column(0); + } + ui32 GetRecordsCount() const { return SlicedBatch->num_rows(); } @@ -19,7 +23,10 @@ public: TSaverSplittedChunk(std::shared_ptr<arrow::RecordBatch> batch, TString&& serializedChunk) : SlicedBatch(batch) - , SerializedChunk(std::move(serializedChunk)) { + , SerializedChunk(std::move(serializedChunk)) + { + Y_VERIFY(SlicedBatch); + Y_VERIFY(SlicedBatch->num_columns() == 1); } diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index e748014c17..c0b18ff858 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -27,6 +27,7 @@ SRCS( columnshard_impl.cpp columnshard_common.cpp columnshard_private_events.cpp + columnshard_schema.cpp counters.cpp compaction_actor.cpp defs.cpp |