aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-08-16 15:16:27 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-08-16 16:39:12 +0300
commit5c2405ab8c1e8cf1ba22505d529bdc0d96ecf675 (patch)
tree243e5bc1fab3f529efe4aaf31dc6c06933ade14e
parent22d756319c13b10fe917926367e9919ac205f05e (diff)
downloadydb-5c2405ab8c1e8cf1ba22505d529bdc0d96ecf675.tar.gz
KIKIMR-18932: meta incapsulation for merging from different chunks
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.cpp28
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h101
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp48
-rw-r--r--ydb/core/tx/columnshard/engines/columns_table.h2
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.h5
-rw-r--r--ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.cpp68
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.h56
-rw-r--r--ydb/core/tx/columnshard/engines/portions/common.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/portions/common.h29
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.cpp107
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.h64
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp269
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h76
-rw-r--r--ydb/core/tx/columnshard/engines/portions/with_blobs.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/portions/with_blobs.h2
-rw-r--r--ydb/core/tx/columnshard/engines/portions/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h1
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp18
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h2
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp21
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.cpp5
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.h19
-rw-r--r--ydb/core/tx/columnshard/splitter/rb_splitter.cpp2
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.h9
-rw-r--r--ydb/core/tx/columnshard/ya.make1
37 files changed, 609 insertions, 383 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index c0473293c9..5a7cf7c09e 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -87,6 +87,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 25dbd82022..a059a18ad9 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -88,6 +88,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 25dbd82022..a059a18ad9 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -88,6 +88,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index c0473293c9..5a7cf7c09e 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -87,6 +87,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp
new file mode 100644
index 0000000000..3ec637e786
--- /dev/null
+++ b/ydb/core/tx/columnshard/columnshard_schema.cpp
@@ -0,0 +1,28 @@
+#include "columnshard_schema.h"
+
+namespace NKikimr::NColumnShard {
+
+bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index, const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback) {
+ auto rowset = db.Table<IndexColumns>().Prefix(index).Select();
+ if (!rowset.IsReady())
+ return false;
+
+ while (!rowset.EndOfSet()) {
+ NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty();
+ portion.SetGranule(rowset.GetValue<IndexColumns::Granule>());
+ portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>());
+ portion.SetPortion(rowset.GetValue<IndexColumns::Portion>());
+
+ NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, dsGroupSelector);
+
+ portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>());
+
+ callback(portion, chunkLoadContext);
+
+ if (!rowset.Next())
+ return false;
+ }
+ return true;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index e4b8538503..d4e734b727 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -12,6 +12,10 @@
#include <type_traits>
+namespace NKikimr::NOlap {
+class TColumnChunkLoadContext;
+}
+
namespace NKikimr::NColumnShard {
using NOlap::TWriteId;
@@ -592,15 +596,20 @@ struct Schema : NIceDb::Schema {
// IndexColumns activities
static void IndexColumns_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
+ auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk);
+ auto rowProto = row.GetMeta().SerializeToProto();
+ if (proto) {
+ *rowProto.MutablePortionMeta() = std::move(*proto);
+ }
db.Table<IndexColumns>().Key(index, portion.GetGranule(), row.ColumnId,
portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update(
- NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()),
- NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()),
- NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()),
- NIceDb::TUpdate<IndexColumns::Metadata>(row.Metadata),
- NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset),
- NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size)
- );
+ NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()),
+ NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()),
+ NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()),
+ NIceDb::TUpdate<IndexColumns::Metadata>(rowProto.SerializeAsString()),
+ NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset),
+ NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size)
+ );
}
static void IndexColumns_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
@@ -609,37 +618,7 @@ struct Schema : NIceDb::Schema {
}
static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index,
- const std::function<void(const NOlap::TPortionInfo&, const TColumnRecord&)>& callback) {
- auto rowset = db.Table<IndexColumns>().Prefix(index).Select();
- if (!rowset.IsReady())
- return false;
-
- while (!rowset.EndOfSet()) {
- TColumnRecord row;
- NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty();
- portion.SetGranule(rowset.GetValue<IndexColumns::Granule>());
- row.ColumnId = rowset.GetValue<IndexColumns::ColumnIdx>();
- portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>());
- portion.SetPortion(rowset.GetValue<IndexColumns::Portion>());
- row.Chunk = rowset.GetValue<IndexColumns::Chunk>();
- portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>());
-
- TString strBlobId = rowset.GetValue<IndexColumns::Blob>();
- row.Metadata = rowset.GetValue<IndexColumns::Metadata>();
- row.BlobRange.Offset = rowset.GetValue<IndexColumns::Offset>();
- row.BlobRange.Size = rowset.GetValue<IndexColumns::Size>();
-
- Y_VERIFY(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size());
- TLogoBlobID logoBlobId((const ui64*)strBlobId.data());
- row.BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId);
-
- callback(portion, row);
-
- if (!rowset.Next())
- return false;
- }
- return true;
- }
+ const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback);
// IndexCounters
@@ -688,3 +667,49 @@ struct Schema : NIceDb::Schema {
};
}
+
+namespace NKikimr::NOlap {
+class TColumnChunkLoadContext {
+private:
+ YDB_READONLY_DEF(TBlobRange, BlobRange);
+ TChunkAddress Address;
+ YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);
+public:
+ const TChunkAddress& GetAddress() const {
+ return Address;
+ }
+
+ TColumnChunkLoadContext(const TChunkAddress& address, const TBlobRange& bRange, const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
+ : BlobRange(bRange)
+ , Address(address)
+ , MetaProto(metaProto)
+ {
+
+ }
+
+ template <class TSource>
+ TColumnChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
+ : Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(), rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>()) {
+ AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
+ TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Blob>();
+ Y_VERIFY(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size());
+ TLogoBlobID logoBlobId((const ui64*)strBlobId.data());
+ BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId);
+ BlobRange.Offset = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Offset>();
+ BlobRange.Size = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Size>();
+ AFL_VERIFY(BlobRange.BlobId.IsValid() && BlobRange.Size)("event", "incorrect blob")("blob", BlobRange.ToString());
+
+ const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Metadata>();
+ AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
+ }
+
+ const NKikimrTxColumnShard::TIndexPortionMeta* GetPortionMeta() const {
+ if (MetaProto.HasPortionMeta()) {
+ return &MetaProto.GetPortionMeta();
+ } else {
+ return nullptr;
+ }
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
index 315cfbe91c..073823a663 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
@@ -38,7 +38,7 @@ bool TTTLColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyC
const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion);
Y_VERIFY(oldInfo.IsActive());
- Y_VERIFY(portionInfo.TierName != oldInfo.TierName);
+ Y_VERIFY(portionInfo.GetMeta().GetTierName() != oldInfo.GetMeta().GetTierName());
self.UpsertPortion(portionInfo, &oldInfo);
@@ -60,7 +60,7 @@ void TTTLColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWr
auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
// Mark exported blobs
if (evictionFeatures.NeedExport) {
- auto& tierName = portionInfo.TierName;
+ auto& tierName = portionInfo.GetMeta().GetTierName();
Y_VERIFY(!tierName.empty());
for (auto& rec : portionInfo.Records) {
@@ -161,14 +161,14 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfoWithBlobs& portio
const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords,
TConstructionContext& context) const {
TPortionInfo& portionInfo = portionInfoWithBlobs.GetPortionInfo();
- Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName);
+ Y_VERIFY(portionInfo.GetMeta().GetTierName() != evictFeatures.TargetTierName);
auto* tiering = Tiering.FindPtr(evictFeatures.PathId);
Y_VERIFY(tiering);
auto compression = tiering->GetCompression(evictFeatures.TargetTierName);
if (!compression) {
// Noting to recompress. We have no other kinds of evictions yet.
- portionInfo.TierName = evictFeatures.TargetTierName;
+ portionInfo.MutableMeta().SetTierName(evictFeatures.TargetTierName);
evictFeatures.DataChanges = false;
return true;
}
@@ -191,21 +191,22 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfoWithBlobs& portio
auto field = resultSchema->GetFieldByIndex(pos);
TString blob;
+ std::shared_ptr<arrow::RecordBatch> rb;
{
auto it = srcBlobs.find(rec.BlobRange);
Y_VERIFY(it != srcBlobs.end());
- auto rb = resultSchema->GetColumnLoader(rec.ColumnId)->Apply(it->second);
- Y_VERIFY(rb.ok());
+ rb = NArrow::TStatusValidator::GetValid(resultSchema->GetColumnLoader(rec.ColumnId)->Apply(it->second));
auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext);
- blob = columnSaver.Apply(*rb);
+ blob = columnSaver.Apply(rb);
}
+ Y_VERIFY(rb->num_columns() == 1);
if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
return false;
}
if (portionInfoWithBlobs.GetBlobs().empty() || portionInfoWithBlobs.GetBlobs().back().GetSize() + blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
- portionInfoWithBlobs.StartBlob(0).AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, 0, blob));
+ portionInfoWithBlobs.StartBlob(0).AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, blob, rb->column(0)), blobSchema->GetIndexInfo());
} else {
- portionInfoWithBlobs.GetBlobs().back().AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, 0, blob));
+ portionInfoWithBlobs.GetBlobs().back().AddChunk(portionInfoWithBlobs, TOrderedColumnChunk(rec.ColumnId, blob, rb->column(0)), blobSchema->GetIndexInfo());
}
}
@@ -230,7 +231,7 @@ NKikimr::TConclusionStatus TTTLColumnEngineChanges::DoConstructBlobs(TConstructi
for (auto& [portionInfo, evictFeatures] : PortionsToEvict) {
if (UpdateEvictedPortion(portionInfo, evictFeatures, Blobs, EvictedRecords, context)) {
- Y_VERIFY(portionInfo.GetPortionInfo().TierName == evictFeatures.TargetTierName);
+ Y_VERIFY(portionInfo.GetPortionInfo().GetMeta().GetTierName() == evictFeatures.TargetTierName);
evicted.emplace_back(std::move(portionInfo), evictFeatures);
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index a31e93a0bf..71a61f013d 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -18,7 +18,7 @@ void TChangesWithAppend::DoDebugString(TStringOutput& out) const {
void TChangesWithAppend::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& /*context*/) {
for (auto& portionInfo : AppendedPortions) {
- switch (portionInfo.GetPortionInfo().Meta.Produced) {
+ switch (portionInfo.GetPortionInfo().GetMeta().Produced) {
case NOlap::TPortionMeta::EProduced::UNSPECIFIED:
Y_VERIFY(false); // unexpected
case NOlap::TPortionMeta::EProduced::INSERTED:
@@ -113,7 +113,7 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(
auto& blobInfo = infoWithBlob.StartBlob(blob.size());
for (auto&& chunk : blob) {
const TString data = chunk.GetData();
- srcBlobs.emplace(blobInfo.AddChunk(infoWithBlob, std::move(chunk)).BlobRange, data);
+ srcBlobs.emplace(blobInfo.AddChunk(infoWithBlob, std::move(chunk), resultSchema->GetIndexInfo()).BlobRange, data);
}
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portion_appended", infoWithBlob.GetPortionInfo().DebugString());
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 8f43be3fc9..aad865264e 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -7,6 +7,7 @@
#include <ydb/core/formats/arrow/merging_sorted_input_stream.h>
#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/tx/columnshard/columnshard_ttl.h>
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/library/conclusion/status.h>
#include "changes/indexation.h"
#include "changes/in_granule_compaction.h"
@@ -98,12 +99,10 @@ TColumnEngineStats::TPortionsStats DeltaStats(const TPortionInfo& portionInfo, u
TColumnEngineStats::TPortionsStats deltaStats;
THashSet<TUnifiedBlobId> blobs;
for (auto& rec : portionInfo.Records) {
- metadataBytes += rec.Metadata.size();
+ metadataBytes += rec.GetMeta().GetMetadataSize();
blobs.insert(rec.BlobRange.BlobId);
deltaStats.BytesByColumn[rec.ColumnId] += rec.BlobRange.Size;
- }
- for (auto& rec : portionInfo.Meta.ColumnMeta) {
- deltaStats.RawBytesByColumn[rec.first] += rec.second.RawBytes;
+ deltaStats.RawBytesByColumn[rec.ColumnId] += rec.GetMeta().GetRawBytes();
}
deltaStats.Rows = portionInfo.NumRows();
deltaStats.RawBytes = portionInfo.RawBytesSum();
@@ -123,16 +122,16 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
ui64 metadataBytes = 0;
TColumnEngineStats::TPortionsStats deltaStats = DeltaStats(portionInfo, metadataBytes);
- Y_VERIFY(!exPortionInfo || exPortionInfo->Meta.Produced != TPortionMeta::EProduced::UNSPECIFIED);
- Y_VERIFY(portionInfo.Meta.Produced != TPortionMeta::EProduced::UNSPECIFIED);
+ Y_VERIFY(!exPortionInfo || exPortionInfo->GetMeta().Produced != TPortionMeta::EProduced::UNSPECIFIED);
+ Y_VERIFY(portionInfo.GetMeta().Produced != TPortionMeta::EProduced::UNSPECIFIED);
TColumnEngineStats::TPortionsStats& srcStats = exPortionInfo
? (exPortionInfo->IsActive()
- ? engineStats.StatsByType[exPortionInfo->Meta.Produced]
+ ? engineStats.StatsByType[exPortionInfo->GetMeta().Produced]
: engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE])
- : engineStats.StatsByType[portionInfo.Meta.Produced];
+ : engineStats.StatsByType[portionInfo.GetMeta().Produced];
TColumnEngineStats::TPortionsStats& stats = portionInfo.IsActive()
- ? engineStats.StatsByType[portionInfo.Meta.Produced]
+ ? engineStats.StatsByType[portionInfo.GetMeta().Produced]
: engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE];
const bool isErase = updateType == EStatsUpdateType::ERASE;
@@ -236,16 +235,19 @@ bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) {
}
bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs) {
- return ColumnsTable->Load(db, [&](const TPortionInfo& portion, const TColumnRecord& rec) {
- auto& indexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo();
+ TSnapshot lastSnapshot(0, 0);
+ const TIndexInfo* currentIndexInfo = nullptr;
+ return ColumnsTable->Load(db, [&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
+ if (!currentIndexInfo || lastSnapshot != portion.GetMinSnapshot()) {
+ currentIndexInfo = &VersionedIndex.GetSchema(portion.GetMinSnapshot())->GetIndexInfo();
+ lastSnapshot = portion.GetMinSnapshot();
+ }
Y_VERIFY(portion.ValidSnapshotInfo());
- Y_VERIFY(rec.Valid());
// Do not count the blob as lost since it exists in the index.
- lostBlobs.erase(rec.BlobRange.BlobId);
+ lostBlobs.erase(loadContext.GetBlobRange().BlobId);
// Locate granule and append the record.
- const auto gi = Granules.find(portion.GetGranule());
- Y_VERIFY(gi != Granules.end());
- gi->second->AddColumnRecord(indexInfo, portion, rec);
+ TColumnRecord rec(loadContext, *currentIndexInfo);
+ GetGranulePtrVerified(portion.GetGranule())->AddColumnRecord(*currentIndexInfo, portion, rec, loadContext.GetPortionMeta());
});
}
@@ -327,6 +329,7 @@ std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompacti
std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop, ui32 maxRecords) noexcept {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size());
auto changes = TChangesConstructor::BuildCleanupChanges(snapshot);
ui32 affectedRecords = 0;
@@ -364,6 +367,7 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
}
if (affectedRecords > maxRecords) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size());
return changes;
}
@@ -392,10 +396,12 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
break;
}
} else {
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup/Check")("snapshot", snapshot.DebugString())("portions_snapshot", portionInfo->GetRemoveSnapshot().DebugString());
Y_VERIFY(portionInfo->CheckForCleanup());
++it;
}
}
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size());
return changes;
}
@@ -446,13 +452,13 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
const TInstant maxTtlPortionInstant = *mpiOpt;
const TDuration d = maxTtlPortionInstant - expireTimestamp;
keep = !!d;
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestamp.Seconds());
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestamp.Seconds());
if (d && dWaiting > d) {
dWaiting = d;
}
}
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop);
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop);
if (keep && tryEvictPortion) {
TString tierName;
for (auto& tierRef : ttl.GetOrderedTiers()) {
@@ -477,8 +483,8 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
break;
}
}
- if (info.TierName != tierName) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", info.TierName)("to", tierName);
+ if (info.GetMeta().GetTierName() != tierName) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", info.GetMeta().GetTierName())("to", tierName);
evictionSize += info.BlobsSizes().first;
const bool needExport = ttl.NeedExport(tierName);
context.Changes->AddPortionToEvict(info, TPortionEvictionFeatures(tierName, pathId, needExport));
@@ -678,7 +684,7 @@ static TMap<TSnapshot, std::vector<const TPortionInfo*>> GroupPortionsBySnapshot
if (visible) {
out[recSnapshot].push_back(&portionInfo);
}
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "GroupPortionsBySnapshot")("analyze_portion", portionInfo.DebugString())("visible", visible)("snapshot", snapshot.DebugString());
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "GroupPortionsBySnapshot")("analyze_portion", portionInfo.DebugString())("visible", visible)("snapshot", snapshot.DebugString());
}
return out;
}
diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h
index 42f2eb87db..517a90c8a4 100644
--- a/ydb/core/tx/columnshard/engines/columns_table.h
+++ b/ydb/core/tx/columnshard/engines/columns_table.h
@@ -21,7 +21,7 @@ public:
db.EraseColumn(IndexId, portion, row);
}
- bool Load(IDbWrapper& db, std::function<void(const TPortionInfo&, const TColumnRecord&)> callback) {
+ bool Load(IDbWrapper& db, std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)> callback) {
return db.LoadColumns(IndexId, callback);
}
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
index 5aba26b580..4bef7aa140 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
@@ -65,7 +65,7 @@ void TDbWrapper::EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, con
NColumnShard::Schema::IndexColumns_Erase(db, index, portion, row);
}
-bool TDbWrapper::LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnRecord&)>& callback) {
+bool TDbWrapper::LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) {
NIceDb::TNiceDb db(Database);
return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, index, callback);
}
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h
index 9a1572590a..876a2d4c87 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.h
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.h
@@ -7,6 +7,7 @@ class TDatabase;
namespace NKikimr::NOlap {
+class TColumnChunkLoadContext;
struct TInsertedData;
class TInsertTableAccessor;
struct TColumnRecord;
@@ -34,7 +35,7 @@ public:
virtual void WriteColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual void EraseColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0;
- virtual bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnRecord&)>& callback) = 0;
+ virtual bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) = 0;
virtual void WriteCounter(ui32 index, ui32 counterId, ui64 value) = 0;
virtual bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) = 0;
@@ -63,7 +64,7 @@ public:
void WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
void EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
- bool LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnRecord&)>& callback) override;
+ bool LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override;
void WriteCounter(ui32 index, ui32 counterId, ui64 value) override;
bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) override;
diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt
index 6db0e70f31..b8559cce9c 100644
--- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.darwin-x86_64.txt
@@ -27,6 +27,7 @@ target_sources(columnshard-engines-portions PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/common.cpp
)
generate_enum_serilization(columnshard-engines-portions
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h
diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt
index 7c602514cd..679d414a1e 100644
--- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-aarch64.txt
@@ -28,6 +28,7 @@ target_sources(columnshard-engines-portions PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/common.cpp
)
generate_enum_serilization(columnshard-engines-portions
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h
diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt
index 7c602514cd..679d414a1e 100644
--- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.linux-x86_64.txt
@@ -28,6 +28,7 @@ target_sources(columnshard-engines-portions PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/common.cpp
)
generate_enum_serilization(columnshard-engines-portions
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h
diff --git a/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt
index 6db0e70f31..b8559cce9c 100644
--- a/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/portions/CMakeLists.windows-x86_64.txt
@@ -27,6 +27,7 @@ target_sources(columnshard-engines-portions PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/column_record.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/meta.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/common.cpp
)
generate_enum_serilization(columnshard-engines-portions
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portions/portion_info.h
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.cpp b/ydb/core/tx/columnshard/engines/portions/column_record.cpp
index 621928056a..8b69b6447d 100644
--- a/ydb/core/tx/columnshard/engines/portions/column_record.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.cpp
@@ -1,5 +1,73 @@
#include "column_record.h"
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/tx/columnshard/common/scalars.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
namespace NKikimr::NOlap {
+TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context, const TIndexInfo& indexInfo) {
+ auto field = indexInfo.ArrowColumnField(context.GetAddress().GetColumnId());
+ if (context.GetMetaProto().HasNumRows()) {
+ NumRows = context.GetMetaProto().GetNumRows();
+ }
+ if (context.GetMetaProto().HasRawBytes()) {
+ RawBytes = context.GetMetaProto().GetRawBytes();
+ }
+ if (context.GetMetaProto().HasMinValue()) {
+ Min = ConstantToScalar(context.GetMetaProto().GetMinValue(), field->type());
+ }
+ if (context.GetMetaProto().HasMaxValue()) {
+ Max = ConstantToScalar(context.GetMetaProto().GetMaxValue(), field->type());
+ }
+}
+
+TChunkMeta::TChunkMeta(const std::shared_ptr<arrow::Array>& column, const ui32 columnId, const TIndexInfo& indexInfo) {
+ Y_VERIFY(column);
+ Y_VERIFY(column->length());
+ NumRows = column->length();
+ RawBytes = NArrow::GetArrayDataSize(column);
+
+ if (indexInfo.GetMinMaxIdxColumns().contains(columnId)) {
+ std::pair<i32, i32> minMaxPos = {0, (column->length() - 1)};
+ if (!indexInfo.IsSortedColumn(columnId)) {
+ minMaxPos = NArrow::FindMinMaxPosition(column);
+ Y_VERIFY(minMaxPos.first >= 0);
+ Y_VERIFY(minMaxPos.second >= 0);
+ }
+
+ Min = NArrow::GetScalar(column, minMaxPos.first);
+ Max = NArrow::GetScalar(column, minMaxPos.second);
+
+ Y_VERIFY(Min);
+ Y_VERIFY(Max);
+ }
+}
+
+NKikimrTxColumnShard::TIndexColumnMeta TChunkMeta::SerializeToProto() const {
+ NKikimrTxColumnShard::TIndexColumnMeta meta;
+ meta.SetNumRows(NumRows);
+ meta.SetRawBytes(RawBytes);
+ if (HasMinMax()) {
+ ScalarToConstant(*Min, *meta.MutableMinValue());
+ ScalarToConstant(*Max, *meta.MutableMaxValue());
+ }
+ return meta;
+}
+
+TColumnRecord::TColumnRecord(const TColumnChunkLoadContext& loadContext, const TIndexInfo& info)
+ : Meta(loadContext, info)
+ , ColumnId(loadContext.GetAddress().GetColumnId())
+ , Chunk(loadContext.GetAddress().GetChunk())
+ , BlobRange(loadContext.GetBlobRange())
+{
+}
+
+TColumnRecord::TColumnRecord(const TChunkAddress& address, const std::shared_ptr<arrow::Array>& column, const TIndexInfo& info)
+ : Meta(column, address.GetColumnId(), info)
+ , ColumnId(address.GetColumnId())
+ , Chunk(address.GetChunk())
+{
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h
index b36e8516de..13ca916783 100644
--- a/ydb/core/tx/columnshard/engines/portions/column_record.h
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.h
@@ -1,22 +1,63 @@
#pragma once
+#include "common.h"
+#include <ydb/core/protos/tx_columnshard.pb.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/common/snapshot.h>
+#include <ydb/library/accessor/accessor.h>
+#include <util/string/builder.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
namespace NKikimr::NOlap {
+class TColumnChunkLoadContext;
+struct TIndexInfo;
+
+struct TChunkMeta {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, Min);
+ YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, Max);
+ YDB_READONLY(ui32, NumRows, 0);
+ YDB_READONLY(ui32, RawBytes, 0);
+public:
+ ui64 GetMetadataSize() const {
+ return sizeof(NumRows) + sizeof(RawBytes) + 8 * 3 * 2;
+ }
+
+ bool HasMinMax() const noexcept {
+ return Min.get() && Max.get();
+ }
+
+ NKikimrTxColumnShard::TIndexColumnMeta SerializeToProto() const;
+
+ TChunkMeta(const TColumnChunkLoadContext& context, const TIndexInfo& indexInfo);
+
+ TChunkMeta(const std::shared_ptr<arrow::Array>& column, const ui32 columnId, const TIndexInfo& indexInfo);
+};
struct TColumnRecord {
+private:
+ TChunkMeta Meta;
+public:
ui32 ColumnId = 0;
- ui16 Chunk; // Number of blob for column ColumnName in Portion
+ ui16 Chunk = 0;
TBlobRange BlobRange;
- TString Metadata;
+
+ const TChunkMeta& GetMeta() const {
+ return Meta;
+ }
+
+ TChunkAddress GetAddress() const {
+ return TChunkAddress(ColumnId, Chunk);
+ }
bool IsEqualTest(const TColumnRecord& item) const {
return ColumnId == item.ColumnId && Chunk == item.Chunk;
}
std::optional<ui32> GetChunkRowsCount() const {
- return {};
+ return Meta.GetNumRows();
}
bool Valid() const {
@@ -38,12 +79,9 @@ struct TColumnRecord {
return BlobRange.BlobId.IsValid() && BlobRange.Size;
}
- static TColumnRecord Make(ui32 columnId, ui16 chunk = 0) {
- TColumnRecord row;
- row.ColumnId = columnId;
- row.Chunk = chunk;
- return row;
- }
+ TColumnRecord(const TChunkAddress& address, const std::shared_ptr<arrow::Array>& column, const TIndexInfo& info);
+
+ TColumnRecord(const TColumnChunkLoadContext& loadContext, const TIndexInfo& info);
friend IOutputStream& operator << (IOutputStream& out, const TColumnRecord& rec) {
out << '{';
diff --git a/ydb/core/tx/columnshard/engines/portions/common.cpp b/ydb/core/tx/columnshard/engines/portions/common.cpp
new file mode 100644
index 0000000000..e18ca98033
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/common.cpp
@@ -0,0 +1,10 @@
+#include "common.h"
+#include <util/string/builder.h>
+
+namespace NKikimr::NOlap {
+
+TString TChunkAddress::DebugString() const {
+ return TStringBuilder() << "(column_id=" << ColumnId << ";chunk=" << Chunk << ";)";
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/portions/common.h b/ydb/core/tx/columnshard/engines/portions/common.h
new file mode 100644
index 0000000000..b0305fa53e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/common.h
@@ -0,0 +1,29 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NOlap {
+
+class TChunkAddress {
+private:
+ YDB_READONLY(ui32, ColumnId, 0);
+ YDB_READONLY(ui16, Chunk, 0);
+public:
+ TChunkAddress(const ui32 columnId, const ui16 chunk)
+ : ColumnId(columnId)
+ , Chunk(chunk) {
+
+ }
+
+ bool operator<(const TChunkAddress& address) const {
+ return std::tie(ColumnId, Chunk) < std::tie(address.ColumnId, address.Chunk);
+ }
+
+ bool operator==(const TChunkAddress& address) const {
+ return std::tie(ColumnId, Chunk) == std::tie(address.ColumnId, address.Chunk);
+ }
+
+ TString DebugString() const;
+};
+
+
+}
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp
index ac2c17ad66..2aa1efd2ec 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp
@@ -1,5 +1,112 @@
#include "meta.h"
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/formats/arrow/arrow_filter.h>
+#include <library/cpp/actors/core/log.h>
namespace NKikimr::NOlap {
+void TPortionMeta::FillBatchInfo(const std::shared_ptr<arrow::RecordBatch> batch, const TIndexInfo& indexInfo) {
+ {
+ auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
+ std::vector<bool> bits(batch->num_rows(), false);
+ bits[0] = true;
+ bits[batch->num_rows() - 1] = true; // it could be 0 if batch has one row
+
+ auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows());
+ auto res = arrow::compute::Filter(keyBatch, filter);
+ Y_VERIFY(res.ok());
+
+ ReplaceKeyEdges = res->record_batch();
+ Y_VERIFY(ReplaceKeyEdges->num_rows() == 1 || ReplaceKeyEdges->num_rows() == 2);
+ }
+
+ auto edgesBatch = NArrow::ExtractColumns(ReplaceKeyEdges, indexInfo.GetIndexKey());
+ IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
+ IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
+}
+
+bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) {
+ const bool compositeIndexKey = indexInfo.IsCompositeIndexKey();
+ if (Produced != TPortionMeta::EProduced::UNSPECIFIED) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication");
+ return false;
+ }
+ TierName = portionMeta.GetTierName();
+ if (portionMeta.GetIsInserted()) {
+ Produced = TPortionMeta::EProduced::INSERTED;
+ } else if (portionMeta.GetIsCompacted()) {
+ Produced = TPortionMeta::EProduced::COMPACTED;
+ } else if (portionMeta.GetIsSplitCompacted()) {
+ Produced = TPortionMeta::EProduced::SPLIT_COMPACTED;
+ } else if (portionMeta.GetIsEvicted()) {
+ Produced = TPortionMeta::EProduced::EVICTED;
+ } else {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "incorrect portion meta")("meta", portionMeta.DebugString());
+ return false;
+ }
+
+ if (portionMeta.HasPrimaryKeyBorders()) {
+ ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
+ Y_VERIFY(ReplaceKeyEdges);
+ Y_VERIFY_DEBUG(ReplaceKeyEdges->ValidateFull().ok());
+ Y_VERIFY(ReplaceKeyEdges->num_rows() == 1 || ReplaceKeyEdges->num_rows() == 2);
+
+ if (compositeIndexKey) {
+ auto edgesBatch = NArrow::ExtractColumns(ReplaceKeyEdges, indexInfo.GetIndexKey());
+ Y_VERIFY(edgesBatch);
+ IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
+ IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
+ }
+ }
+ return true;
+}
+
+std::optional<NKikimrTxColumnShard::TIndexPortionMeta> TPortionMeta::SerializeToProto(const ui32 columnId, const ui32 chunk) const {
+ if (columnId != FirstPkColumn || chunk != 0) {
+ return {};
+ }
+
+ NKikimrTxColumnShard::TIndexPortionMeta portionMeta;
+ portionMeta.SetTierName(TierName);
+
+ switch (Produced) {
+ case TPortionMeta::EProduced::UNSPECIFIED:
+ Y_VERIFY(false);
+ case TPortionMeta::EProduced::INSERTED:
+ portionMeta.SetIsInserted(true);
+ break;
+ case TPortionMeta::EProduced::COMPACTED:
+ portionMeta.SetIsCompacted(true);
+ break;
+ case TPortionMeta::EProduced::SPLIT_COMPACTED:
+ portionMeta.SetIsSplitCompacted(true);
+ break;
+ case TPortionMeta::EProduced::EVICTED:
+ portionMeta.SetIsEvicted(true);
+ break;
+ case TPortionMeta::EProduced::INACTIVE:
+ Y_FAIL("Unexpected inactive case");
+ //portionMeta->SetInactive(true);
+ break;
+ }
+
+ if (const auto& keyEdgesBatch = ReplaceKeyEdges) {
+ Y_VERIFY(keyEdgesBatch);
+ Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok());
+ Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2);
+ portionMeta.SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch));
+ }
+ return portionMeta;
+}
+
+TString TPortionMeta::DebugString() const {
+ TStringBuilder sb;
+ sb << "(produced=" << Produced << ";";
+ if (TierName) {
+ sb << "tier_name=" << TierName << ";";
+ }
+ sb << ")";
+ return sb;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h
index 8095be85ca..8ab306a8ab 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.h
+++ b/ydb/core/tx/columnshard/engines/portions/meta.h
@@ -1,68 +1,40 @@
#pragma once
#include <ydb/core/tx/columnshard/common/portion.h>
#include <ydb/core/formats/arrow/replace_key.h>
+#include <ydb/core/protos/tx_columnshard.pb.h>
#include <util/stream/output.h>
namespace NKikimr::NOlap {
+struct TIndexInfo;
+
struct TPortionMeta {
+private:
+ void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted);
+ std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows
+ YDB_ACCESSOR_DEF(TString, TierName);
+public:
using EProduced = NPortion::EProduced;
- struct TColumnMeta {
- ui32 NumRows{0};
- ui32 RawBytes{0};
- std::shared_ptr<arrow::Scalar> Min;
- std::shared_ptr<arrow::Scalar> Max;
-
- bool HasMinMax() const noexcept {
- return Min.get() && Max.get();
- }
- };
-
- EProduced GetProduced() const {
- return Produced;
- }
-
- EProduced Produced{EProduced::UNSPECIFIED};
- THashMap<ui32, TColumnMeta> ColumnMeta;
- ui32 FirstPkColumn = 0;
- std::shared_ptr<arrow::RecordBatch> ReplaceKeyEdges; // first and last PK rows
std::optional<NArrow::TReplaceKey> IndexKeyStart;
std::optional<NArrow::TReplaceKey> IndexKeyEnd;
+ EProduced Produced{EProduced::UNSPECIFIED};
+ ui32 FirstPkColumn = 0;
- TString DebugString() const {
- return TStringBuilder() <<
- "produced:" << Produced << ";"
- ;
- }
+ bool DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo);
+
+ std::optional<NKikimrTxColumnShard::TIndexPortionMeta> SerializeToProto(const ui32 columnId, const ui32 chunk) const;
- bool HasMinMax(ui32 columnId) const {
- if (!ColumnMeta.contains(columnId)) {
- return false;
- }
- return ColumnMeta.find(columnId)->second.HasMinMax();
- }
+ void FillBatchInfo(const std::shared_ptr<arrow::RecordBatch> batch, const TIndexInfo& indexInfo);
- bool HasPkMinMax() const {
- return HasMinMax(FirstPkColumn);
+ EProduced GetProduced() const {
+ return Produced;
}
- ui32 NumRows() const {
- if (FirstPkColumn) {
- Y_VERIFY(ColumnMeta.contains(FirstPkColumn));
- return ColumnMeta.find(FirstPkColumn)->second.NumRows;
- }
- return 0;
- }
+ TString DebugString() const;
friend IOutputStream& operator << (IOutputStream& out, const TPortionMeta& info) {
- out << "reason" << (ui32)info.Produced;
- for (const auto& [_, meta] : info.ColumnMeta) {
- if (meta.NumRows) {
- out << " " << meta.NumRows << " rows";
- break;
- }
- }
+ out << info.DebugString();
return out;
}
};
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 1f152049fc..df3abc327e 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -18,221 +18,52 @@ const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record)
}
}
if (maxChunk) {
- Y_VERIFY(*maxChunk + 1 == record.Chunk);
+ AFL_VERIFY(*maxChunk + 1 == record.Chunk)("max", *maxChunk)("record", record.Chunk);
} else {
- Y_VERIFY(0 == record.Chunk);
+ AFL_VERIFY(0 == record.Chunk)("record", record.Chunk);
}
Records.emplace_back(std::move(record));
return Records.back();
}
-void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) {
- Y_VERIFY(column->length());
-
- std::pair<int, int> minMaxPos = {0, (column->length() - 1)};
- if (!sorted) {
- minMaxPos = NArrow::FindMinMaxPosition(column);
- }
-
- Y_VERIFY(minMaxPos.first >= 0);
- Y_VERIFY(minMaxPos.second >= 0);
-
- Meta.ColumnMeta[columnId].Min = NArrow::GetScalar(column, minMaxPos.first);
- Meta.ColumnMeta[columnId].Max = NArrow::GetScalar(column, minMaxPos.second);
-}
-
void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
const TString& tierName) {
const auto& indexInfo = snapshotSchema.GetIndexInfo();
- const auto& minMaxColumns = indexInfo.GetMinMaxIdxColumns();
- TierName = tierName;
Meta = {};
Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
-
- // Copy first and last key rows into new batch to free source batch's memory
- {
- auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey());
- std::vector<bool> bits(batch->num_rows(), false);
- bits[0] = true;
- bits[batch->num_rows() - 1] = true; // it colud be 0 if batch has one row
-
- auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows());
- auto res = arrow::compute::Filter(keyBatch, filter);
- Y_VERIFY(res.ok());
-
- Meta.ReplaceKeyEdges = res->record_batch();
- Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2);
- }
-
- auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey());
- Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
- Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
-
- /// @note It does not add RawBytes info for snapshot columns, only for user ones.
- for (auto& [columnId, col] : indexInfo.Columns) {
- const auto& columnName = col.Name;
- auto column = batch->GetColumnByName(col.Name);
- Y_VERIFY(column);
- Meta.ColumnMeta[columnId].NumRows = column->length();
- Meta.ColumnMeta[columnId].RawBytes = NArrow::GetArrayDataSize(column);
-
- if (minMaxColumns.contains(columnId)) {
- auto column = batch->GetColumnByName(columnName);
- Y_VERIFY(column);
-
- const bool isSorted = (columnId == Meta.FirstPkColumn);
- AddMinMax(columnId, column, isSorted);
- Y_VERIFY(Meta.HasMinMax(columnId));
- }
- }
+ Meta.FillBatchInfo(batch, indexInfo);
+ Meta.SetTierName(tierName);
}
-TString TPortionInfo::GetMetadata(const TColumnRecord& rec) const {
- NKikimrTxColumnShard::TIndexColumnMeta meta; // TODO: move proto serialization out of engines folder
- if (Meta.ColumnMeta.contains(rec.ColumnId)) {
- const auto& columnMeta = Meta.ColumnMeta.find(rec.ColumnId)->second;
- if (auto numRows = columnMeta.NumRows) {
- meta.SetNumRows(numRows);
- }
- if (auto rawBytes = columnMeta.RawBytes) {
- meta.SetRawBytes(rawBytes);
- }
- if (columnMeta.HasMinMax()) {
- ScalarToConstant(*columnMeta.Min, *meta.MutableMinValue());
- ScalarToConstant(*columnMeta.Max, *meta.MutableMaxValue());
- }
- }
-
- if (rec.ColumnId == Meta.FirstPkColumn) {
- auto* portionMeta = meta.MutablePortionMeta();
-
- switch (Meta.Produced) {
- case TPortionMeta::EProduced::UNSPECIFIED:
- Y_VERIFY(false);
- case TPortionMeta::EProduced::INSERTED:
- portionMeta->SetIsInserted(true);
- break;
- case TPortionMeta::EProduced::COMPACTED:
- portionMeta->SetIsCompacted(true);
- break;
- case TPortionMeta::EProduced::SPLIT_COMPACTED:
- portionMeta->SetIsSplitCompacted(true);
- break;
- case TPortionMeta::EProduced::EVICTED:
- portionMeta->SetIsEvicted(true);
- break;
- case TPortionMeta::EProduced::INACTIVE:
- Y_FAIL("Unexpected inactive case");
- //portionMeta->SetInactive(true);
- break;
- }
-
- if (!TierName.empty()) {
- portionMeta->SetTierName(TierName);
- }
-
- if (const auto& keyEdgesBatch = Meta.ReplaceKeyEdges) {
- Y_VERIFY(keyEdgesBatch);
- Y_VERIFY_DEBUG(keyEdgesBatch->ValidateFull().ok());
- Y_VERIFY(keyEdgesBatch->num_rows() == 1 || keyEdgesBatch->num_rows() == 2);
- portionMeta->SetPrimaryKeyBorders(NArrow::SerializeBatchNoCompression(keyEdgesBatch));
- }
- }
-
- TString out;
- Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&out);
- return out;
-}
-
-void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec) {
- if (rec.Metadata.empty()) {
- return;
- }
-
- NKikimrTxColumnShard::TIndexColumnMeta meta;
- bool ok = meta.ParseFromString(rec.Metadata);
- Y_VERIFY(ok);
-
- Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
- auto field = indexInfo.ArrowColumnField(rec.ColumnId);
- const bool compositeIndexKey = indexInfo.IsCompositeIndexKey();
-
- if (meta.HasPortionMeta()) {
- Y_VERIFY_DEBUG(rec.ColumnId == Meta.FirstPkColumn);
-
- auto& portionMeta = meta.GetPortionMeta();
- TierName = portionMeta.GetTierName();
-
- if (portionMeta.GetIsInserted()) {
- Meta.Produced = TPortionMeta::EProduced::INSERTED;
- } else if (portionMeta.GetIsCompacted()) {
- Meta.Produced = TPortionMeta::EProduced::COMPACTED;
- } else if (portionMeta.GetIsSplitCompacted()) {
- Meta.Produced = TPortionMeta::EProduced::SPLIT_COMPACTED;
- } else if (portionMeta.GetIsEvicted()) {
- Meta.Produced = TPortionMeta::EProduced::EVICTED;
- }
-
- if (portionMeta.HasPrimaryKeyBorders()) {
- Meta.ReplaceKeyEdges = NArrow::DeserializeBatch(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
- Y_VERIFY(Meta.ReplaceKeyEdges);
- Y_VERIFY_DEBUG(Meta.ReplaceKeyEdges->ValidateFull().ok());
- Y_VERIFY(Meta.ReplaceKeyEdges->num_rows() == 1 || Meta.ReplaceKeyEdges->num_rows() == 2);
-
- if (compositeIndexKey) {
- auto edgesBatch = NArrow::ExtractColumns(Meta.ReplaceKeyEdges, indexInfo.GetIndexKey());
- Y_VERIFY(edgesBatch);
- Meta.IndexKeyStart = NArrow::TReplaceKey::FromBatch(edgesBatch, 0);
- Meta.IndexKeyEnd = NArrow::TReplaceKey::FromBatch(edgesBatch, edgesBatch->num_rows() - 1);
+std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const {
+ std::shared_ptr<arrow::Scalar> result;
+ for (auto&& i : Records) {
+ if (i.ColumnId == columnId) {
+ if (!i.GetMeta().GetMin()) {
+ return nullptr;
+ }
+ if (!result || NArrow::ScalarCompare(result, i.GetMeta().GetMin()) > 0) {
+ result = i.GetMeta().GetMin();
}
}
}
- if (meta.HasNumRows()) {
- Meta.ColumnMeta[rec.ColumnId].NumRows = meta.GetNumRows();
- }
- if (meta.HasRawBytes()) {
- Meta.ColumnMeta[rec.ColumnId].RawBytes = meta.GetRawBytes();
- }
- if (meta.HasMinValue()) {
- auto scalar = ConstantToScalar(meta.GetMinValue(), field->type());
- Meta.ColumnMeta[rec.ColumnId].Min = scalar;
-
- // Restore Meta.IndexKeyStart for one column IndexKey
- if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) {
- Meta.IndexKeyStart = NArrow::TReplaceKey::FromScalar(scalar);
- }
- }
- if (meta.HasMaxValue()) {
- auto scalar = ConstantToScalar(meta.GetMaxValue(), field->type());
- Meta.ColumnMeta[rec.ColumnId].Max = scalar;
-
- // Restore Meta.IndexKeyEnd for one column IndexKey
- if (!compositeIndexKey && rec.ColumnId == Meta.FirstPkColumn) {
- Meta.IndexKeyEnd = NArrow::TReplaceKey::FromScalar(scalar);
- }
- }
-
- // Portion genarated without PrimaryKeyBorders and loaded with indexInfo.IsCompositeIndexKey()
- // We should have no such portions for ForceColumnTablesCompositeMarks feature
- if (rec.ColumnId == Meta.FirstPkColumn) {
- Y_VERIFY(Meta.IndexKeyStart && Meta.IndexKeyEnd);
- }
-}
-
-std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const {
- if (!Meta.ColumnMeta.contains(columnId)) {
- return {};
- }
- return Meta.ColumnMeta.find(columnId)->second.Min;
+ return result;
}
std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
- auto it = Meta.ColumnMeta.find(columnId);
- if (it == Meta.ColumnMeta.end()) {
- return {};
+ std::shared_ptr<arrow::Scalar> result;
+ for (auto&& i : Records) {
+ if (i.ColumnId == columnId) {
+ if (!i.GetMeta().GetMax()) {
+ return nullptr;
+ }
+ if (!result || NArrow::ScalarCompare(result, i.GetMeta().GetMax()) < 0) {
+ result = i.GetMeta().GetMax();
+ }
+ }
}
- return it->second.Max;
+ return result;
}
TPortionInfo TPortionInfo::CopyWithFilteredColumns(const THashSet<ui32>& columnIds) const {
@@ -256,9 +87,10 @@ ui64 TPortionInfo::GetRawBytes(const std::vector<ui32>& columnIds) const {
if (TIndexInfo::IsSpecialColumn(i)) {
sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i);
} else {
- auto it = Meta.ColumnMeta.find(i);
- if (it != Meta.ColumnMeta.end()) {
- sum += it->second.RawBytes;
+ for (auto&& r : Records) {
+ if (r.ColumnId == i) {
+ sum += r.GetMeta().GetRawBytes();
+ }
}
}
}
@@ -283,14 +115,47 @@ TString TPortionInfo::DebugString() const {
if (RemoveSnapshot.Valid()) {
sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");";
}
- sb << "meta:(" << Meta << ");";
+ sb << "meta:(" << Meta.DebugString() << ");";
sb << "chunks:(" << Records.size() << ");";
- if (TierName) {
- sb << "tier:" << TierName << ";";
- }
return sb << ")";
}
+void TPortionInfo::AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta) {
+ Records.push_back(rec);
+
+ if (portionMeta) {
+ Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
+ Y_VERIFY(Meta.DeserializeFromProto(*portionMeta, indexInfo));
+ }
+ if (!indexInfo.IsCompositeIndexKey() && indexInfo.GetPKFirstColumnId() == rec.ColumnId) {
+ if (rec.GetMeta().GetMin()) {
+ auto candidate = NArrow::TReplaceKey::FromScalar(rec.GetMeta().GetMin());
+ if (!Meta.IndexKeyStart || candidate < *Meta.IndexKeyStart) {
+ Meta.IndexKeyStart = candidate;
+ }
+ }
+ if (rec.GetMeta().GetMax()) {
+ auto candidate = NArrow::TReplaceKey::FromScalar(rec.GetMeta().GetMax());
+ if (!Meta.IndexKeyEnd || *Meta.IndexKeyEnd < candidate) {
+ Meta.IndexKeyEnd = candidate;
+ }
+ }
+ }
+}
+
+bool TPortionInfo::HasPkMinMax() const {
+ bool result = false;
+ for (auto&& i : Records) {
+ if (i.ColumnId == Meta.FirstPkColumn) {
+ if (!i.GetMeta().HasMinMax()) {
+ return false;
+ }
+ result = true;
+ }
+ }
+ return result;
+}
+
std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
Y_VERIFY(!Blobs.empty());
@@ -308,7 +173,7 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() c
std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const {
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
- std::vector< std::shared_ptr<arrow::Field>> fields;
+ std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto&& i : Columns) {
if (!options.IsAcceptedColumn(i.GetColumnId())) {
continue;
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 455e102497..267e0d4570 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -17,16 +17,38 @@ private:
ui64 Portion = 0; // Id of independent (overlayed by PK) portion of data in granule
TSnapshot MinSnapshot = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion}
TSnapshot RemoveSnapshot = TSnapshot::Zero(); // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one)
+
+ bool HasPkMinMax() const;
+ TPortionMeta Meta;
public:
static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024;
+ void ResetMeta() {
+ Meta = TPortionMeta();
+ }
+
+ const TPortionMeta& GetMeta() const {
+ return Meta;
+ }
+
+ TPortionMeta& MutableMeta() {
+ return Meta;
+ }
+
std::vector<TColumnRecord> Records;
- TPortionMeta Meta;
- TString TierName;
+
+ const TColumnRecord* GetRecordPointer(const TChunkAddress& address) const {
+ for (auto&& i : Records) {
+ if (i.GetAddress() == address) {
+ return &i;
+ }
+ }
+ return nullptr;
+ }
bool Empty() const { return Records.empty(); }
bool Produced() const { return Meta.GetProduced() != TPortionMeta::EProduced::UNSPECIFIED; }
- bool Valid() const { return MinSnapshot.Valid() && Granule && Portion && !Empty() && Produced() && Meta.HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
+ bool Valid() const { return MinSnapshot.Valid() && Granule && Portion && !Empty() && Produced() && HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && Granule && Portion; }
bool IsInserted() const { return Meta.GetProduced() == TPortionMeta::EProduced::INSERTED; }
bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; }
@@ -50,7 +72,6 @@ public:
, Portion(portionId)
, MinSnapshot(minSnapshot)
{
-
}
TString DebugString() const;
@@ -153,21 +174,12 @@ public:
void UpdateRecordsMeta(TPortionMeta::EProduced produced) {
Meta.Produced = produced;
- for (auto& record : Records) {
- record.Metadata = GetMetadata(record);
- }
}
- void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec) {
- Records.push_back(rec);
- LoadMetadata(indexInfo, rec);
- }
+ void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta);
- TString GetMetadata(const TColumnRecord& rec) const;
- void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec);
void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
const TString& tierName);
- void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted);
std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;
@@ -183,17 +195,35 @@ public:
}
ui32 NumRows() const {
- return Meta.NumRows();
+ ui32 result = 0;
+ std::optional<ui32> columnIdFirst;
+ for (auto&& i : Records) {
+ if (!columnIdFirst || *columnIdFirst == i.ColumnId) {
+ result += i.GetMeta().GetNumRows();
+ columnIdFirst = i.ColumnId;
+ }
+ }
+ return result;
+ }
+
+ ui32 NumRows(const ui32 columnId) const {
+ ui32 result = 0;
+ for (auto&& i : Records) {
+ if (columnId == i.ColumnId) {
+ result += i.GetMeta().GetNumRows();
+ }
+ }
+ return result;
}
ui64 GetRawBytes(const std::vector<ui32>& columnIds) const;
ui64 RawBytesSum() const {
- ui64 sum = 0;
- for (auto& [columnId, colMeta] : Meta.ColumnMeta) {
- sum += colMeta.RawBytes;
+ ui64 result = 0;
+ for (auto&& i : Records) {
+ result += i.GetMeta().GetRawBytes();
}
- return sum;
+ return result;
}
private:
@@ -354,8 +384,7 @@ public:
std::vector<TPreparedColumn> columns;
columns.reserve(resultSchema.GetSchema()->num_fields());
- Y_VERIFY(!Meta.ColumnMeta.empty());
- const ui32 rowsCount = Meta.ColumnMeta.begin()->second.NumRows;
+ const ui32 rowsCount = NumRows();
for (auto&& field : resultSchema.GetSchema()->fields()) {
columns.emplace_back(TPreparedColumn({ TAssembleBlobInfo(rowsCount) }, resultSchema.GetColumnLoader(field->name())));
}
@@ -372,10 +401,7 @@ public:
Y_ASSERT(pos >= 0);
positionsMap[resulPos] = pos;
Y_VERIFY(columnChunks[resulPos].emplace(rec.Chunk, rec.BlobRange).second);
- auto columnMeta = Meta.ColumnMeta.FindPtr(rec.ColumnId);
- if (columnMeta) {
- Y_VERIFY_S(rowsCount == columnMeta->NumRows, TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << columnMeta->NumRows);
- }
+ Y_VERIFY_S(rowsCount == NumRows(rec.ColumnId), TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << NumRows(rec.ColumnId));
}
// Make chunked arrays for columns
diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
index cc07cafb38..56d228ce93 100644
--- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp
@@ -2,10 +2,10 @@
namespace NKikimr::NOlap {
-const TColumnRecord& TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk) {
+const TColumnRecord& TPortionInfoWithBlobs::TBlobInfo::AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk, const TIndexInfo& info) {
Y_VERIFY(!ResultBlob);
const ui32 columnId = chunk.GetColumnId();
- auto rec = TColumnRecord::Make(columnId, owner.ColumnChunkIds[columnId]++);
+ TColumnRecord rec(TChunkAddress(columnId, owner.ColumnChunkIds[columnId]++), chunk.GetColumn(), info);
rec.BlobRange.Offset = Size;
rec.BlobRange.Size = chunk.GetData().size();
auto& result = owner.PortionInfo.AppendOneChunkColumn(std::move(rec));
diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/with_blobs.h
index 5dc70c0aac..80fb9cbb88 100644
--- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h
+++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.h
@@ -33,7 +33,7 @@ public:
return *ResultBlob;
}
- const TColumnRecord& AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk);
+ const TColumnRecord& AddChunk(TPortionInfoWithBlobs& owner, TOrderedColumnChunk&& chunk, const TIndexInfo& info);
void RegisterBlobId(TPortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId);
};
diff --git a/ydb/core/tx/columnshard/engines/portions/ya.make b/ydb/core/tx/columnshard/engines/portions/ya.make
index 394c17d6ad..2af9b511fd 100644
--- a/ydb/core/tx/columnshard/engines/portions/ya.make
+++ b/ydb/core/tx/columnshard/engines/portions/ya.make
@@ -5,6 +5,7 @@ SRCS(
column_record.cpp
with_blobs.cpp
meta.cpp
+ common.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index 7ad41d34bb..20f0e21a7e 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -167,6 +167,7 @@ public:
/// Returns whether the sorting keys defined.
bool IsSorted() const { return SortingKey.get(); }
+ bool IsSortedColumn(const ui32 columnId) const { return GetPKFirstColumnId() == columnId; }
/// Returns whether the replace keys defined.
bool IsReplacing() const { return ReplaceKey.get(); }
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index 7cc5d05761..394c536013 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -28,7 +28,7 @@ ui64 TGranuleMeta::Size() const {
}
void TGranuleMeta::UpsertPortion(const TPortionInfo& info) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info.DebugString())("granule", GetGranuleId());
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info.DebugString())("granule", GetGranuleId());
auto it = Portions.find(info.GetPortion());
AFL_VERIFY(info.GetGranule() == GetGranuleId())("event", "incompatible_granule")("portion", info.DebugString())("granule", GetGranuleId());
@@ -52,10 +52,10 @@ void TGranuleMeta::UpsertPortion(const TPortionInfo& info) {
bool TGranuleMeta::ErasePortion(const ui64 portion) {
auto it = Portions.find(portion);
if (it == Portions.end()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased_already")("portion_id", portion)("pathId", Record.PathId);
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased_already")("portion_id", portion)("pathId", Record.PathId);
return false;
} else {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second)("pathId", Record.PathId);
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "portion_erased")("portion_info", it->second)("pathId", Record.PathId);
}
OnBeforeChangePortion(&it->second, nullptr);
Portions.erase(it);
@@ -63,20 +63,20 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) {
return true;
}
-void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec) {
+void TGranuleMeta::AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta) {
auto it = Portions.find(portion.GetPortion());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "add_column_record")("portion_info", portion.DebugString())("record", rec.DebugString());
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_column_record")("portion_info", portion.DebugString())("record", rec.DebugString());
if (it == Portions.end()) {
Y_VERIFY(portion.Records.empty());
auto portionNew = portion;
- portionNew.AddRecord(indexInfo, rec);
+ portionNew.AddRecord(indexInfo, rec, portionMeta);
OnBeforeChangePortion(nullptr, &portionNew);
Portions.emplace(portion.GetPortion(), std::move(portionNew));
OnAfterChangePortion();
} else {
Y_VERIFY(it->second.IsEqualWithSnapshots(portion));
auto portionNew = it->second;
- portionNew.AddRecord(indexInfo, rec);
+ portionNew.AddRecord(indexInfo, rec, portionMeta);
OnBeforeChangePortion(&it->second, &portionNew);
it->second = std::move(portionNew);
OnAfterChangePortion();
@@ -96,7 +96,7 @@ void TGranuleMeta::OnBeforeChangePortion(const TPortionInfo* portionBefore, cons
blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size;
}
for (auto&& i : blobIdSize) {
- PortionInfoGuard.OnDropBlob(portionBefore->IsActive() ? portionBefore->Meta.Produced : NPortion::EProduced::INACTIVE, i.second);
+ PortionInfoGuard.OnDropBlob(portionBefore->IsActive() ? portionBefore->GetMeta().Produced : NPortion::EProduced::INACTIVE, i.second);
}
}
if (portionAfter) {
@@ -105,7 +105,7 @@ void TGranuleMeta::OnBeforeChangePortion(const TPortionInfo* portionBefore, cons
blobIdSize[i.BlobRange.BlobId] += i.BlobRange.Size;
}
for (auto&& i : blobIdSize) {
- PortionInfoGuard.OnNewBlob(portionAfter->IsActive() ? portionAfter->Meta.Produced : NPortion::EProduced::INACTIVE, i.second);
+ PortionInfoGuard.OnNewBlob(portionAfter->IsActive() ? portionAfter->GetMeta().Produced : NPortion::EProduced::INACTIVE, i.second);
}
}
if (!!AdditiveSummaryCache) {
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 748453314e..2cb355e798 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -366,7 +366,7 @@ public:
const TGranuleRecord Record;
- void AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec);
+ void AddColumnRecord(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta);
const THashMap<ui64, TPortionInfo>& GetPortions() const {
return Portions;
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index c973a2cc9d..a4078db93b 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -1,8 +1,11 @@
-#include <library/cpp/testing/unittest/registar.h>
-#include <util/string/printf.h>
#include "db_wrapper.h"
#include "insert_table/insert_table.h"
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/string/printf.h>
+
namespace NKikimr {
using namespace NOlap;
@@ -30,7 +33,7 @@ public:
void WriteColumn(ui32, const TPortionInfo&, const TColumnRecord&) override {}
void EraseColumn(ui32, const TPortionInfo&, const TColumnRecord&) override {}
- bool LoadColumns(ui32, const std::function<void(const TPortionInfo&, const TColumnRecord&)>&) override { return true; }
+ bool LoadColumns(ui32, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>&) override { return true; }
void WriteCounter(ui32, ui32, ui64) override {}
bool LoadCounters(ui32, const std::function<void(ui32 id, ui64 value)>&) override { return true; }
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 4bb0810c1f..fb4c91d3f8 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -15,6 +15,8 @@ using TTypeInfo = NScheme::TTypeInfo;
namespace {
class TTestDbWrapper : public IDbWrapper {
+private:
+ std::map<TPortionAddress, std::map<TChunkAddress, TColumnChunkLoadContext>> LoadContexts;
public:
struct TIndex {
THashMap<ui64, std::vector<TGranuleRecord>> Granules; // pathId -> granule
@@ -103,7 +105,18 @@ public:
}
void WriteColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) override {
+ auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk);
+ auto rowProto = row.GetMeta().SerializeToProto();
+ if (proto) {
+ *rowProto.MutablePortionMeta() = std::move(*proto);
+ }
+
auto& data = Indices[index].Columns[portion.GetGranule()];
+ NOlap::TColumnChunkLoadContext loadContext(row.GetAddress(), row.BlobRange, rowProto);
+ auto itInsertInfo = LoadContexts[portion.GetAddress()].emplace(row.GetAddress(), loadContext);
+ if (!itInsertInfo.second) {
+ itInsertInfo.first->second = loadContext;
+ }
auto it = data.find(portion.GetPortion());
if (it == data.end()) {
it = data.emplace(portion.GetPortion(), portion.CopyWithFilteredColumns({})).first;
@@ -141,14 +154,18 @@ public:
portionLocal.Records.swap(filtered);
}
- bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnRecord&)>& callback) override {
+ bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override {
auto& columns = Indices[index].Columns;
for (auto& [granule, portions] : columns) {
for (auto& [portionId, portionLocal] : portions) {
auto copy = portionLocal;
+ copy.ResetMeta();
copy.Records.clear();
for (const auto& rec : portionLocal.Records) {
- callback(copy, rec);
+ auto itContextLoader = LoadContexts[copy.GetAddress()].find(rec.GetAddress());
+ Y_VERIFY(itContextLoader != LoadContexts[copy.GetAddress()].end());
+ callback(copy, itContextLoader->second);
+ LoadContexts[copy.GetAddress()].erase(itContextLoader);
}
}
}
diff --git a/ydb/core/tx/columnshard/splitter/chunks.cpp b/ydb/core/tx/columnshard/splitter/chunks.cpp
index d5390412f8..fb6f7f6619 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.cpp
+++ b/ydb/core/tx/columnshard/splitter/chunks.cpp
@@ -1,4 +1,5 @@
#include "chunks.h"
+#include <ydb/core/formats/arrow/arrow_helpers.h>
namespace NKikimr::NOlap {
@@ -13,4 +14,8 @@ std::vector<TSplittedColumnChunk> TSplittedColumnChunk::InternalSplit(const TCol
return newChunks;
}
+TString TOrderedColumnChunk::DebugString() const {
+ return TStringBuilder() << "column_id=" << ColumnId << ";data_size=" << Data.size() << ";records_count=" << Column->length() << ";data=" << NArrow::DebugJson(Column, 3, 3) << ";";
+}
+
}
diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h
index af5b28bacd..921e13229d 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.h
+++ b/ydb/core/tx/columnshard/splitter/chunks.h
@@ -106,20 +106,27 @@ public:
class TOrderedColumnChunk {
private:
YDB_READONLY(ui32, ColumnId, 0);
- YDB_READONLY(ui32, RecordsCount, 0);
YDB_READONLY_DEF(TString, Data);
+ std::shared_ptr<arrow::Array> Column;
public:
- TOrderedColumnChunk(const ui32 columnId, const ui32 recordsCount, const TString& data)
+ std::shared_ptr<arrow::Array> GetColumn() const {
+ return Column;
+ }
+
+ ui32 GetRecordsCount() const {
+ return Column->length();
+ }
+
+ TOrderedColumnChunk(const ui32 columnId, const TString& data, std::shared_ptr<arrow::Array> column)
: ColumnId(columnId)
- , RecordsCount(recordsCount)
, Data(std::move(data))
+ , Column(column)
{
+ Y_VERIFY(Column);
}
- TString DebugString() const {
- return TStringBuilder() << "column_id=" << ColumnId << ";data_size=" << Data.size() << ";records_count=" << RecordsCount << ";";
- }
+ TString DebugString() const;
};
}
diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
index a5210a72f2..7892859ff4 100644
--- a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
+++ b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp
@@ -91,7 +91,7 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<TOrderedColumnChunk>>& portio
}
std::vector<TOrderedColumnChunk> chunksForBlob;
for (auto&& c : i.GetChunks()) {
- chunksForBlob.emplace_back(c.GetColumnId(), c.GetData().GetRecordsCount(), c.GetData().GetSerializedChunk());
+ chunksForBlob.emplace_back(c.GetColumnId(), c.GetData().GetSerializedChunk(), c.GetData().GetColumn());
}
result.emplace_back(std::move(chunksForBlob));
}
diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h
index 54fba342be..4e41d4d6e7 100644
--- a/ydb/core/tx/columnshard/splitter/simple.h
+++ b/ydb/core/tx/columnshard/splitter/simple.h
@@ -11,6 +11,10 @@ private:
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, SlicedBatch);
YDB_READONLY_DEF(TString, SerializedChunk);
public:
+ std::shared_ptr<arrow::Array> GetColumn() const {
+ return SlicedBatch->column(0);
+ }
+
ui32 GetRecordsCount() const {
return SlicedBatch->num_rows();
}
@@ -19,7 +23,10 @@ public:
TSaverSplittedChunk(std::shared_ptr<arrow::RecordBatch> batch, TString&& serializedChunk)
: SlicedBatch(batch)
- , SerializedChunk(std::move(serializedChunk)) {
+ , SerializedChunk(std::move(serializedChunk))
+ {
+ Y_VERIFY(SlicedBatch);
+ Y_VERIFY(SlicedBatch->num_columns() == 1);
}
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index e748014c17..c0b18ff858 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -27,6 +27,7 @@ SRCS(
columnshard_impl.cpp
columnshard_common.cpp
columnshard_private_events.cpp
+ columnshard_schema.cpp
counters.cpp
compaction_actor.cpp
defs.cpp