aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-05-23 17:17:59 +0300
committernsofya <nsofya@yandex-team.com>2023-05-23 17:17:59 +0300
commit2e7ed60ae97dc177d2c964c338ac10bfbc6977cf (patch)
treed5acc13c0658bb89b445b7d739129bedbe61dfaf
parentedc61279de69023329a1f01c7e8b7f87d0f9ceab (diff)
downloadydb-2e7ed60ae97dc177d2c964c338ac10bfbc6977cf.tar.gz
Correct portion snapshot
1) Сохранение версии индекса в момент сохранения в InsertTable 2) Приведение блоков к максимальной по вресии схеме при сохранении в индекс.
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp66
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp7
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp7
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h30
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h52
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp90
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.h10
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h10
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table.h38
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp36
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h19
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp100
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp8
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp7
20 files changed, 335 insertions, 189 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index cbf27bdccf8..e26ec97adff 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -4350,7 +4350,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
- void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates) {
+ void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function<void()> onBeforeCommit = {}, const EStatus opStatus = EStatus::SUCCESS) {
NLongTx::TLongTxBeginResult resBeginTx = LongTxClient.BeginWriteTx().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString());
@@ -4360,7 +4360,11 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
NLongTx::TLongTxWriteResult resWrite =
LongTxClient.Write(txId, table.GetName(), txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), opStatus, resWrite.Status().GetIssues().ToString());
+
+ if (onBeforeCommit) {
+ onBeforeCommit();
+ }
NLongTx::TLongTxCommitResult resCommitTx = LongTxClient.CommitTx(txId).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
@@ -4404,7 +4408,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
testHelper.CreateTable(testTable);
-
+
{
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
@@ -4430,7 +4434,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
auto columns = description.GetTableColumns();
UNIT_ASSERT_VALUES_EQUAL(columns.size(), 4);
}
-
+
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]");
testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest` WHERE id=1", "[[#]]");
testHelper.ReadData("SELECT resource_id FROM `/Root/ColumnTableTest` WHERE id=1", "[[[\"test_res_1\"]]]");
@@ -4447,6 +4451,60 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest`", "[[#];[#];[[200u]]]");
}
+ Y_UNIT_TEST(AddColumnOldScheme) {
+ TKikimrSettings runnerSettings;
+ runnerSettings.WithSampleTables = false;
+ TTestHelper testHelper(runnerSettings);
+
+ TVector<TTestHelper::TColumnSchema> schema = {
+ TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
+ TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
+ TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
+ };
+
+ TTestHelper::TColumnTable testTable;
+
+ testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
+ testHelper.CreateTable(testTable);
+ {
+ auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;";
+ auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
+ {
+ TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+ tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
+ testHelper.InsertData(testTable, tableInserter, {}, EStatus::SCHEME_ERROR);
+ }
+ }
+
+ Y_UNIT_TEST(AddColumnOnSchemeChange) {
+ TKikimrSettings runnerSettings;
+ runnerSettings.WithSampleTables = false;
+ TTestHelper testHelper(runnerSettings);
+
+ TVector<TTestHelper::TColumnSchema> schema = {
+ TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
+ TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
+ TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
+ };
+
+ TTestHelper::TColumnTable testTable;
+
+ testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
+ testHelper.CreateTable(testTable);
+
+ {
+ TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+ tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
+ testHelper.InsertData(testTable, tableInserter, [&testTable, &testHelper]() {
+ auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;";
+ auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
+ });
+ }
+ testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]");
+ }
+
Y_UNIT_TEST(AddColumnWithStore) {
TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 411cd360f2d..913b7056ce7 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -19,12 +19,12 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP
IndexedData.InitRead(batchNo);
// Add cached batches without read
for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, NOlap::TSnapshot::Zero() });
+ auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId));
Y_VERIFY(!cmt.empty());
const NOlap::TCommittedBlob& cmtBlob = cmt.key();
ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.GetSnapshot());
+ IndexedData.AddNotIndexed(batchNo, batch, cmtBlob);
}
// Read all remained committed blobs
for (const auto& [cmtBlob, _] : WaitCommitted) {
@@ -44,11 +44,11 @@ void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data
if (IndexedData.IsIndexedBlob(blobRange)) {
IndexedData.AddIndexed(blobRange, data);
} else {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, NOlap::TSnapshot::Zero() });
+ auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId));
Y_VERIFY(!cmt.empty());
const NOlap::TCommittedBlob& cmtBlob = cmt.key();
ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, data, cmtBlob.GetSnapshot());
+ IndexedData.AddNotIndexed(batchNo, data, cmtBlob);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 5ccca9c758c..33ee2942e37 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -65,7 +65,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
- NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time);
+ const auto& snapshotSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
+ NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time, snapshotSchema->GetSnapshot());
ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time);
@@ -221,7 +222,9 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize;
++WritesInFly; // write started
- ctx.Register(CreateWriteActor(TabletID(), TablesManager.GetIndexInfo(), ctx.SelfID,
+
+ const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
+ ctx.Register(CreateWriteActor(TabletID(), snapshotSchema->GetIndexInfo(), ctx.SelfID,
BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
}
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 40c3539ec55..fab45d3bdee 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -59,11 +59,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
bool ok = false;
if (Ev->Get()->PutStatus == NKikimrProto::OK) {
- NOlap::TSnapshot snapshot = changes->ApplySnapshot;
- if (snapshot.IsZero()) {
- snapshot = NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
- Y_VERIFY(Ev->Get()->IndexInfo.GetLastSchema()->GetSnapshot() <= snapshot);
- }
+ NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
+ Y_VERIFY(Ev->Get()->IndexInfo.GetLastSchema()->GetSnapshot() <= snapshot);
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index d81b47a627c..29ead9b7053 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -197,9 +197,11 @@ struct Schema : NIceDb::Schema {
struct DedupId : Column<5, NScheme::NTypeIds::String> {};
struct BlobId : Column<6, NScheme::NTypeIds::String> {};
struct Meta : Column<7, NScheme::NTypeIds::String> {};
+ struct IndexPlanStep : Column<8, NScheme::NTypeIds::Uint64> {};
+ struct IndexTxId : Column<9, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<Committed, ShardOrPlan, WriteTxId, PathId, DedupId>;
- using TColumns = TableColumns<Committed, ShardOrPlan, WriteTxId, PathId, DedupId, BlobId, Meta>;
+ using TColumns = TableColumns<Committed, ShardOrPlan, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId>;
};
struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> {
@@ -416,10 +418,19 @@ struct Schema : NIceDb::Schema {
// InsertTable activities
static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) {
- db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update(
- NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()),
- NIceDb::TUpdate<InsertTable::Meta>(data.Metadata)
- );
+ if (data.GetSchemaSnapshot().Valid()) {
+ db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update(
+ NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()),
+ NIceDb::TUpdate<InsertTable::Meta>(data.Metadata),
+ NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()),
+ NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId())
+ );
+ } else {
+ db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update(
+ NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()),
+ NIceDb::TUpdate<InsertTable::Meta>(data.Metadata)
+ );
+ }
}
static void InsertTable_Erase(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) {
@@ -469,6 +480,13 @@ struct Schema : NIceDb::Schema {
TString strBlobId = rowset.GetValue<InsertTable::BlobId>();
TString metaStr = rowset.GetValue<InsertTable::Meta>();
+ std::optional<NOlap::TSnapshot> indexSnapshot;
+ if (rowset.HaveValue<InsertTable::IndexPlanStep>()) {
+ ui64 indexPlanStep = rowset.GetValue<InsertTable::IndexPlanStep>();
+ ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>();
+ indexSnapshot = NOlap::TSnapshot(indexPlanStep, indexTxId);
+ }
+
TString error;
NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error);
Y_VERIFY(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str());
@@ -479,7 +497,7 @@ struct Schema : NIceDb::Schema {
writeTime = TInstant::Seconds(meta.GetDirtyWriteTimeSeconds());
}
- TInsertedData data(shardOrPlan, writeTxId, pathId, dedupId, blobId, metaStr, writeTime);
+ TInsertedData data(shardOrPlan, writeTxId, pathId, dedupId, blobId, metaStr, writeTime, indexSnapshot);
switch (recType) {
case EInsertTableIds::Inserted:
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 5669cf59061..007c4565dac 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -169,7 +169,6 @@ public:
EType Type;
TCompactionLimits Limits;
TSnapshot InitSnapshot = TSnapshot::Zero();
- TSnapshot ApplySnapshot = TSnapshot::Zero();
std::unique_ptr<TCompactionInfo> CompactionInfo;
std::vector<NOlap::TInsertedData> DataToIndex;
std::vector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index bfad74d8dea..05670517985 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -454,11 +454,8 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) {
Y_VERIFY(dataToIndex.size());
- auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(dataToIndex), limits);
+ auto changes = TChanges::BuildInsertChanges(DefaultMark(), std::move(dataToIndex), LastSnapshot, limits);
ui32 reserveGranules = 0;
-
- changes->InitSnapshot = LastSnapshot;
-
for (const auto& data : changes->DataToIndex) {
const ui64 pathId = data.PathId;
@@ -496,7 +493,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
Y_VERIFY(info);
Y_VERIFY(info->Granules.size() == 1);
- auto changes = std::make_shared<TChanges>(DefaultMark(), std::move(info), limits, LastSnapshot);
+ auto changes = TChanges::BuildCompactionChanges(DefaultMark(), std::move(info), limits, LastSnapshot);
const ui64 granule = *changes->CompactionInfo->Granules.begin();
const auto gi = Granules.find(granule);
@@ -541,7 +538,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
const TCompactionLimits& limits,
THashSet<ui64>& pathsToDrop,
ui32 maxRecords) {
- auto changes = std::make_shared<TChanges>(DefaultMark(), snapshot, limits);
+ auto changes = TChanges::BuildClenupChanges(DefaultMark(), snapshot, limits);
ui32 affectedRecords = 0;
// Add all portions from dropped paths
@@ -627,8 +624,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
return {};
}
- TSnapshot fakeSnapshot(1, 1); // TODO: better snapshot
- auto changes = std::make_shared<TChanges>(DefaultMark(), TColumnEngineChanges::TTL, fakeSnapshot);
+ auto changes = TChanges::BuildTtlChanges(DefaultMark());
ui64 evicttionSize = 0;
bool allowEviction = true;
ui64 dropBlobs = 0;
@@ -758,9 +754,6 @@ void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr
bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) {
auto changes = std::static_pointer_cast<TChanges>(indexChanges);
- if (changes->ApplySnapshot.Valid()) {
- Y_VERIFY(changes->ApplySnapshot == snapshot);
- }
const auto& indexInfo = GetIndexInfo();
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 93de430962e..c16a2397a6c 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -49,6 +49,11 @@ public:
};
class TChanges : public TColumnEngineChanges {
+ private:
+ TChanges(TColumnEngineChanges::EType type, const TMark& defaultMark, const TCompactionLimits& limits)
+ : TColumnEngineChanges(type, limits)
+ , DefaultMark(defaultMark) {}
+
public:
struct TSrcGranule {
ui64 PathId{0};
@@ -60,37 +65,34 @@ public:
{}
};
- TChanges(const TMark& defaultMark,
- std::vector<NOlap::TInsertedData>&& blobsToIndex, const TCompactionLimits& limits)
- : TColumnEngineChanges(TColumnEngineChanges::INSERT, limits)
- , DefaultMark(defaultMark)
- {
- DataToIndex = std::move(blobsToIndex);
+ static std::shared_ptr<TChanges> BuildInsertChanges(const TMark& defaultMark,
+ std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot,
+ const TCompactionLimits& limits) {
+ std::shared_ptr<TChanges> changes(new TChanges(TColumnEngineChanges::INSERT, defaultMark, limits));
+ changes->DataToIndex = blobsToIndex;
+ changes->InitSnapshot = initSnapshot;
+ return std::move(changes);
}
- TChanges(const TMark& defaultMark,
- std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits, const TSnapshot& snapshot)
- : TColumnEngineChanges(TColumnEngineChanges::COMPACTION, limits)
- , DefaultMark(defaultMark)
- {
- CompactionInfo = std::move(info);
- InitSnapshot = snapshot;
+ static std::shared_ptr<TChanges> BuildCompactionChanges(const TMark& defaultMark,
+ std::unique_ptr<TCompactionInfo>&& info,
+ const TCompactionLimits& limits,
+ const TSnapshot& initSnapshot) {
+ std::shared_ptr<TChanges> changes(new TChanges(TColumnEngineChanges::COMPACTION, defaultMark, limits));
+ changes->CompactionInfo = std::move(info);
+ changes->InitSnapshot = initSnapshot;
+ return std::move(changes);
}
- TChanges(const TMark& defaultMark,
- const TSnapshot& snapshot, const TCompactionLimits& limits)
- : TColumnEngineChanges(TColumnEngineChanges::CLEANUP, limits)
- , DefaultMark(defaultMark)
- {
- InitSnapshot = snapshot;
+ static std::shared_ptr<TChanges> BuildClenupChanges(const TMark& defaultMark, const TSnapshot& initSnapshot, const TCompactionLimits& limits) {
+ std::shared_ptr<TChanges> changes(new TChanges(TColumnEngineChanges::CLEANUP, defaultMark, limits));
+ changes->InitSnapshot = initSnapshot;
+ return std::move(changes);
}
- TChanges(const TMark& defaultMark,
- TColumnEngineChanges::EType type, const TSnapshot& applySnapshot)
- : TColumnEngineChanges(type, TCompactionLimits())
- , DefaultMark(defaultMark)
- {
- ApplySnapshot = applySnapshot;
+ static std::shared_ptr<TChanges> BuildTtlChanges(const TMark& defaultMark) {
+ std::shared_ptr<TChanges> changes(new TChanges(TColumnEngineChanges::TTL, defaultMark, TCompactionLimits()));
+ return std::move(changes);
}
bool AddPathIfNotExists(ui64 pathId) {
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index d552fb3ee4e..8350be964ea 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -40,8 +40,8 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo,
TPortionInfo undo = portionInfo;
- auto blobSchema = IndexInfo.GetSchema(undo.GetSnapshot());
- auto resultSchema = IndexInfo.GetLastSchema();
+ auto blobSchema = SchemaVersions.GetSchema(undo.GetSnapshot());
+ auto resultSchema = SchemaVersions.GetLastSchema();
auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs);
size_t undoSize = newBlobs.size();
@@ -74,11 +74,11 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo,
std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathId,
const std::shared_ptr<arrow::RecordBatch> batch,
const ui64 granule,
- const TSnapshot& minSnapshot,
+ const TSnapshot& snapshot,
std::vector<TString>& blobs) const {
Y_VERIFY(batch->num_rows());
- auto resultSchema = IndexInfo.GetSchema(minSnapshot);
+ auto resultSchema = SchemaVersions.GetSchema(snapshot);
std::vector<TPortionInfo> out;
TString tierName;
@@ -111,7 +111,7 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(name);
/// @warnign records are not valid cause of empty BlobId and zero Portion
- TColumnRecord record = TColumnRecord::Make(granule, columnId, minSnapshot, 0);
+ TColumnRecord record = TColumnRecord::Make(granule, columnId, snapshot, 0);
auto columnSaver = resultSchema->GetColumnSaver(name, saverContext);
auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record), columnSaver);
if (!blob.size()) {
@@ -143,20 +143,24 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
return out;
}
-std::vector<std::shared_ptr<arrow::RecordBatch>> TCompactionLogic::PortionsToBatches(const std::vector<TPortionInfo>& portions,
+std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, TSnapshot> TCompactionLogic::PortionsToBatches(const std::vector<TPortionInfo>& portions,
const THashMap<TBlobRange, TString>& blobs,
bool insertedOnly) const {
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
batches.reserve(portions.size());
- auto resultSchema = IndexInfo.GetLastSchema();
+ auto resultSchema = SchemaVersions.GetLastSchema();
+ TSnapshot maxSnapshot = resultSchema->GetSnapshot();
for (auto& portionInfo : portions) {
- auto blobSchema = IndexInfo.GetSchema(portionInfo.GetSnapshot());
+ auto blobSchema = SchemaVersions.GetSchema(portionInfo.GetSnapshot());
auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs);
if (!insertedOnly || portionInfo.IsInserted()) {
batches.push_back(batch);
+ if (maxSnapshot < portionInfo.GetSnapshot()) {
+ maxSnapshot = portionInfo.GetSnapshot();
+ }
}
}
- return batches;
+ return std::make_pair(batches, maxSnapshot);
}
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> TIndexLogicBase::SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
@@ -208,36 +212,49 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange
Y_VERIFY(changes->AppendedPortions.empty());
- TSnapshot minSnapshot = changes->ApplySnapshot;
+ auto maxSnapshot = TSnapshot::Zero();
for (auto& inserted : changes->DataToIndex) {
TSnapshot insertSnap = inserted.GetSnapshot();
Y_VERIFY(insertSnap.Valid());
- if (minSnapshot.IsZero() || insertSnap <= minSnapshot) {
- minSnapshot = insertSnap;
+ if (insertSnap > maxSnapshot) {
+ maxSnapshot = insertSnap;
}
}
- Y_VERIFY(minSnapshot.Valid());
- auto& indexInfo = IndexInfo.GetSchema(minSnapshot)->GetIndexInfo();
- Y_VERIFY(indexInfo.IsSorted());
+ Y_VERIFY(maxSnapshot.Valid());
+
+ auto resultSchema = SchemaVersions.GetSchema(maxSnapshot);
+ Y_VERIFY(resultSchema->GetIndexInfo().IsSorted());
THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches;
for (auto& inserted : changes->DataToIndex) {
TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize());
+ auto blobSchema = SchemaVersions.GetSchema(inserted.GetSchemaSnapshot());
+ auto& indexInfo = blobSchema->GetIndexInfo();
+ Y_VERIFY(indexInfo.IsSorted());
+
std::shared_ptr<arrow::RecordBatch> batch;
if (auto it = changes->CachedBlobs.find(inserted.BlobId); it != changes->CachedBlobs.end()) {
batch = it->second;
} else if (auto* blobData = changes->Blobs.FindPtr(blobRange)) {
Y_VERIFY(!blobData->empty(), "Blob data not present");
+ // Prepare batch
batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema());
+ if (!batch) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)
+ ("event", "cannot_parse")
+ ("data_snapshot", TStringBuilder() << inserted.GetSnapshot())
+ ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot());
+ }
} else {
Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str());
}
Y_VERIFY(batch);
- batch = AddSpecials(batch, indexInfo, inserted);
+ batch = resultSchema->NormalizeFullBatch(*blobSchema, batch);
+ batch = AddSpecials(batch, resultSchema->GetIndexInfo(), inserted);
pathBatches[inserted.PathId].push_back(batch);
- Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), indexInfo.GetReplaceKey()));
+ Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), resultSchema->GetIndexInfo().GetReplaceKey()));
}
std::vector<TString> blobs;
@@ -250,15 +267,15 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange
Y_VERIFY(merged);
Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey()));
#else
- auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription());
+ auto merged = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription());
Y_VERIFY(merged);
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey()));
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, resultSchema->GetIndexInfo().GetReplaceKey()));
#endif
- auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], indexInfo);
+ auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], resultSchema->GetIndexInfo());
for (auto& [granule, batch] : granuleBatches) {
- auto portions = MakeAppendedPortions(pathId, batch, granule, minSnapshot, blobs);
+ auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs);
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -270,25 +287,30 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange
return blobs;
}
-std::shared_ptr<arrow::RecordBatch> TCompactionLogic::CompactInOneGranule(ui64 granule,
+std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TCompactionLogic::CompactInOneGranule(ui64 granule,
const std::vector<TPortionInfo>& portions,
const THashMap<TBlobRange, TString>& blobs) const {
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
batches.reserve(portions.size());
- auto resultSchema = IndexInfo.GetLastSchema();
+ auto resultSchema = SchemaVersions.GetLastSchema();
+
+ TSnapshot maxSnapshot = resultSchema->GetSnapshot();
for (auto& portionInfo : portions) {
Y_VERIFY(!portionInfo.Empty());
Y_VERIFY(portionInfo.Granule() == granule);
- auto blobSchema = IndexInfo.GetSchema(portionInfo.GetSnapshot());
+ auto blobSchema = SchemaVersions.GetSchema(portionInfo.GetSnapshot());
auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs);
batches.push_back(batch);
+ if (portionInfo.GetSnapshot() > maxSnapshot) {
+ maxSnapshot = portionInfo.GetSnapshot();
+ }
}
auto sortedBatch = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription());
Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, resultSchema->GetIndexInfo().GetReplaceKey()));
- return sortedBatch;
+ return std::make_pair(sortedBatch, maxSnapshot);
}
std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnEngineForLogs::TChanges> changes) const {
@@ -298,9 +320,9 @@ std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnE
Y_VERIFY(switchedProtions.size());
ui64 granule = switchedProtions[0].Granule();
- auto batch = CompactInOneGranule(granule, switchedProtions, changes->Blobs);
+ auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedProtions, changes->Blobs);
- auto resultSchema = IndexInfo.GetLastSchema();
+ auto resultSchema = SchemaVersions.GetLastSchema();
std::vector<TPortionInfo> portions;
if (!changes->MergeBorders.Empty()) {
Y_VERIFY(changes->MergeBorders.GetOrderedMarks().size() > 1);
@@ -311,13 +333,13 @@ std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnE
if (!slice || slice->num_rows() == 0) {
continue;
}
- auto tmp = MakeAppendedPortions(pathId, slice, granule, TSnapshot::Zero(), blobs);
+ auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs);
for (auto&& portionInfo : tmp) {
portions.emplace_back(std::move(portionInfo));
}
}
} else {
- portions = MakeAppendedPortions(pathId, batch, granule, TSnapshot::Zero(), blobs);
+ portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs);
}
Y_VERIFY(portions.size() > 0);
@@ -558,13 +580,11 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
std::vector<std::pair<TMark, ui64>> tsIds;
ui64 movedRows = TryMovePortions(ts0, portions, tsIds, changes->PortionsToMove);
- const auto& srcBatches = PortionsToBatches(portions, changes->Blobs, movedRows != 0);
+ auto [srcBatches, maxSnapshot] = PortionsToBatches(portions, changes->Blobs, movedRows != 0);
Y_VERIFY(srcBatches.size() == portions.size());
std::vector<TString> blobs;
-
- auto resultSchema = IndexInfo.GetLastSchema();
-
+ auto resultSchema = SchemaVersions.GetLastSchema();
if (movedRows) {
Y_VERIFY(changes->PortionsToMove.size() >= 2);
Y_VERIFY(changes->PortionsToMove.size() == tsIds.size());
@@ -652,7 +672,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
for (const auto& batch : idBatches[id]) {
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot::Zero(), blobs);
+ auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs);
Y_VERIFY(newPortions.size() > 0);
for (auto& portion : newPortions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -668,7 +688,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
ui64 tmpGranule = changes->SetTmpGranule(pathId, ts);
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot::Zero(), blobs);
+ auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs);
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
changes->AppendedPortions.emplace_back(std::move(portion));
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h
index 91471217ee3..b13a84de6d0 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.h
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h
@@ -8,19 +8,19 @@ namespace NKikimr::NOlap {
class TIndexLogicBase {
protected:
- const TVersionedIndex& IndexInfo;
+ const TVersionedIndex& SchemaVersions;
private:
const THashMap<ui64, NKikimr::NOlap::TTiering>* TieringMap = nullptr;
public:
TIndexLogicBase(const TVersionedIndex& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap)
- : IndexInfo(indexInfo)
+ : SchemaVersions(indexInfo)
, TieringMap(&tieringMap)
{
}
TIndexLogicBase(const TVersionedIndex& indexInfo)
- : IndexInfo(indexInfo)
+ : SchemaVersions(indexInfo)
{
}
@@ -72,7 +72,7 @@ public:
private:
std::vector<TString> CompactSplitGranule(const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes) const;
std::vector<TString> CompactInGranule(std::shared_ptr<TColumnEngineForLogs::TChanges> changes) const;
- std::shared_ptr<arrow::RecordBatch> CompactInOneGranule(ui64 granule, const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs) const;
+ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> CompactInOneGranule(ui64 granule, const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs) const;
/// @return vec({ts, batch}). ts0 <= ts1 <= ... <= tsN
/// @note We use ts from PK for split but there could be lots PK with the same ts.
@@ -90,7 +90,7 @@ private:
std::vector<std::pair<TMark, ui64>>& tsIds,
std::vector<std::pair<TPortionInfo, ui64>>& toMove) const;
- std::vector<std::shared_ptr<arrow::RecordBatch>> PortionsToBatches(const std::vector<TPortionInfo>& portions,
+ std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, TSnapshot> PortionsToBatches(const std::vector<TPortionInfo>& portions,
const THashMap<TBlobRange, TString>& blobs,
bool insertedOnly = false) const;
};
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index edccbfcad1f..cb95e0b13d4 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -188,19 +188,20 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>&
// Extract columns (without check), filter, attach snapshot, extract columns with check
// (do not filter snapshot columns)
- auto loadSchema = ReadMetadata->GetLoadSchema(snapshot);
+ auto dataSchema = ReadMetadata->GetLoadSchema(snapshot);
- auto batch = NArrow::ExtractExistedColumns(srcBatch, loadSchema->GetSchema());
+ auto batch = NArrow::ExtractExistedColumns(srcBatch, dataSchema->GetSchema());
Y_VERIFY(batch);
-
+
auto filter = FilterNotIndexed(batch, *ReadMetadata);
if (filter.IsTotalDenyFilter()) {
return nullptr;
}
auto preparedBatch = batch;
-
preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, snapshot);
- preparedBatch = NArrow::ExtractColumns(preparedBatch, loadSchema->GetSchema());
+ auto resultSchema = ReadMetadata->GetLoadSchema();
+ preparedBatch = resultSchema->NormalizeBatch(*dataSchema, preparedBatch);
+ preparedBatch = NArrow::ExtractColumns(preparedBatch, resultSchema->GetSchema());
Y_VERIFY(preparedBatch);
filter.Apply(preparedBatch);
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 1673d49808b..52208cfd348 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -65,16 +65,16 @@ public:
/// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK)
std::vector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch);
- void AddNotIndexed(ui32 batchNo, TString blob, const TSnapshot& snapshot) {
- auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(snapshot));
- AddNotIndexed(batchNo, batch, snapshot);
+ void AddNotIndexed(ui32 batchNo, TString blob, const TCommittedBlob& commitedBlob) {
+ auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(commitedBlob.GetSchemaSnapshot()));
+ AddNotIndexed(batchNo, batch, commitedBlob);
}
- void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) {
+ void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, const TCommittedBlob& commitedBlob) {
Y_VERIFY(batchNo < NotIndexed.size());
Y_VERIFY(!NotIndexed[batchNo]);
++ReadyNotIndexed;
- NotIndexed[batchNo] = MakeNotIndexedBatch(batch, snapshot);
+ NotIndexed[batchNo] = MakeNotIndexedBatch(batch, commitedBlob.GetSchemaSnapshot());
}
void AddIndexed(const TBlobRange& blobRange, const TString& column);
diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table.cpp
index b9a800f510b..50f15ace780 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table.cpp
@@ -198,7 +198,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& sna
for (const auto& data : *committed) {
if (std::less_equal<TSnapshot>()(data.GetSnapshot(), snapshot)) {
- ret.emplace_back(TCommittedBlob{data.BlobId, data.GetSnapshot()});
+ ret.emplace_back(TCommittedBlob(data.BlobId, data.GetSnapshot(), data.GetSchemaSnapshot()));
}
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table.h
index 93357ffbf2e..2e9ddd5f097 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.h
+++ b/ydb/core/tx/columnshard/engines/insert_table.h
@@ -7,6 +7,7 @@
namespace NKikimr::NOlap {
struct TInsertedData {
+public:
ui64 ShardOrPlan = 0;
ui64 WriteTxId = 0;
ui64 PathId = 0;
@@ -18,15 +19,19 @@ struct TInsertedData {
TInsertedData() = delete; // avoid invalid TInsertedData anywhere
TInsertedData(ui64 shardOrPlan, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId,
- const TString& meta, const TInstant& writeTime)
+ const TString& meta, const TInstant& writeTime, const std::optional<TSnapshot>& schemaVersion)
: ShardOrPlan(shardOrPlan)
, WriteTxId(writeTxId)
, PathId(pathId)
, DedupId(dedupId)
, BlobId(blobId)
, Metadata(meta)
- , DirtyTime(writeTime)
- {}
+ , DirtyTime(writeTime) {
+ if (schemaVersion) {
+ SchemaVersion = *schemaVersion;
+ Y_VERIFY(SchemaVersion.Valid());
+ }
+ }
bool operator < (const TInsertedData& key) const {
if (ShardOrPlan < key.ShardOrPlan) {
@@ -84,29 +89,46 @@ struct TInsertedData {
return TSnapshot(ShardOrPlan, WriteTxId);
}
+ const TSnapshot& GetSchemaSnapshot() const {
+ return SchemaVersion;
+ }
+
ui32 BlobSize() const { return BlobId.BlobSize(); }
+
+private:
+ TSnapshot SchemaVersion = TSnapshot::Zero();
};
class TCommittedBlob {
private:
TUnifiedBlobId BlobId;
- TSnapshot Snapshot;
+ TSnapshot CommitSnapshot;
+ TSnapshot SchemaSnapshot;
public:
- TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot)
+ TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot)
: BlobId(blobId)
- , Snapshot(snapshot)
+ , CommitSnapshot(snapshot)
+ , SchemaSnapshot(schemaSnapshot)
{}
+ static TCommittedBlob BuildKeyBlob(const TUnifiedBlobId& blobId) {
+ return TCommittedBlob(blobId, TSnapshot::Zero(), TSnapshot::Zero());
+ }
+
/// It uses trick then we place key wtih planStep:txId in container and find them later by BlobId only.
/// So hash() and equality should depend on BlobId only.
bool operator == (const TCommittedBlob& key) const { return BlobId == key.BlobId; }
ui64 Hash() const noexcept { return BlobId.Hash(); }
TString DebugString() const {
- return TStringBuilder() << BlobId << ";ps=" << Snapshot.GetPlanStep() << ";ti=" << Snapshot.GetTxId();
+ return TStringBuilder() << BlobId << ";ps=" << CommitSnapshot.GetPlanStep() << ";ti=" << CommitSnapshot.GetTxId();
}
const TSnapshot& GetSnapshot() const {
- return Snapshot;
+ return CommitSnapshot;
+ }
+
+ const TSnapshot& GetSchemaSnapshot() const {
+ return SchemaSnapshot;
}
const TUnifiedBlobId& GetBlobId() const {
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index 4bd2230243b..2371b84e821 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -4,6 +4,42 @@
namespace NKikimr::NOlap {
+std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeFullBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const {
+ auto schema = GetIndexInfo().ArrowSchema();
+ return NormalizeBatchImpl(dataSchema, batch, GetIndexInfo().ArrowSchema());
+}
+
+std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const {
+ return NormalizeBatchImpl(dataSchema, batch, GetSchema());
+}
+
+std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatchImpl(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch,
+ const std::shared_ptr<arrow::Schema>& resultArrowSchema) const {
+ if (dataSchema.GetSnapshot() == GetSnapshot()) {
+ return batch;
+ }
+ Y_VERIFY(dataSchema.GetSnapshot() < GetSnapshot());
+ std::vector<std::shared_ptr<arrow::Array>> newColumns;
+ newColumns.resize(resultArrowSchema->num_fields(), 0);
+
+ for (size_t i = 0; i < resultArrowSchema->fields().size(); ++i) {
+ auto& field = resultArrowSchema->fields()[i];
+ auto columnId = GetIndexInfo().GetColumnId(field->name());
+ auto oldColumnIndex = dataSchema.GetFieldIndex(columnId);
+ if (oldColumnIndex >= 0) { // ClumnExists
+ auto oldColumnInfo = dataSchema.GetField(oldColumnIndex);
+ Y_VERIFY(oldColumnInfo);
+ auto columnData = batch->GetColumnByName(oldColumnInfo->name());
+ Y_VERIFY(columnData);
+ newColumns[i] = columnData;
+ } else { // AddNullColumn
+ auto nullColumn = NArrow::MakeEmptyBatch(arrow::schema({field}), batch->num_rows());
+ newColumns[i] = nullColumn->column(0);
+ }
+ }
+ return arrow::RecordBatch::Make(resultArrowSchema, batch->num_rows(), newColumns);
+}
+
TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,
const TColumnSaver saver) {
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 00bb37bb169..a263be80b69 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -37,6 +37,11 @@ public:
virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0;
virtual const TIndexInfo& GetIndexInfo() const = 0;
virtual const TSnapshot& GetSnapshot() const = 0;
+
+ std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const;
+ std::shared_ptr<arrow::RecordBatch> NormalizeFullBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const;
+private:
+ std::shared_ptr<arrow::RecordBatch> NormalizeBatchImpl(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch, const std::shared_ptr<arrow::Schema>& newSchema) const;
};
class TSnapshotSchema: public ISnapshotSchema {
@@ -65,7 +70,10 @@ public:
}
virtual int GetFieldIndex(const ui32 columnId) const override {
- TString columnName = IndexInfo.GetColumnName(columnId);
+ TString columnName = IndexInfo.GetColumnName(columnId, false);
+ if (!columnName) {
+ return -1;
+ }
std::string name(columnName.data(), columnName.size());
return Schema->GetFieldIndex(name);
}
@@ -128,7 +136,10 @@ public:
if (!ColumnIds.contains(columnId)) {
return -1;
}
- TString columnName = OriginalSnapshot->GetIndexInfo().GetColumnName(columnId);
+ TString columnName = OriginalSnapshot->GetIndexInfo().GetColumnName(columnId, false);
+ if (!columnName) {
+ return -1;
+ }
std::string name(columnName.data(), columnName.size());
return Schema->GetFieldIndex(name);
}
@@ -290,11 +301,9 @@ struct TPortionInfo {
}
void UpdateRecords(ui64 portion, const THashMap<ui64, ui64>& granuleRemap, const TSnapshot& snapshot) {
+ Y_UNUSED(snapshot);;
for (auto& rec : Records) {
rec.Portion = portion;
- if (!rec.ValidSnapshot()) {
- rec.SetSnapshot(snapshot);
- }
}
if (!granuleRemap.empty()) {
for (auto& rec : Records) {
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index edc295a6ae4..3688a52e2cd 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -51,19 +51,20 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {
TTestInsertTableDB dbTable;
TInsertTable insertTable;
+ TSnapshot indexSnapshot(1, 1);
// insert, not commited
TInstant time = TInstant::Now();
- bool ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time));
+ bool ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time, indexSnapshot));
UNIT_ASSERT(ok);
// insert the same blobId1 again
- ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time));
+ ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time, indexSnapshot));
UNIT_ASSERT(!ok);
// insert different blodId with the same writeId and dedupId
TUnifiedBlobId blobId2(2222, 1, 2, 100, 1);
- ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId2, {}, time));
+ ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId2, {}, time, indexSnapshot));
UNIT_ASSERT(!ok);
// read nothing
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 155a8292ea4..e85dd848ec7 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -289,16 +289,6 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
return engine.ApplyChanges(db, changes, snap);
}
-bool Insert(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap,
- std::vector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) {
- TColumnEngineForLogs engine(0, TestLimits());
- engine.UpdateDefaultSchema(snap, TIndexInfo(tableInfo));
- THashSet<TUnifiedBlobId> lostBlobs;
- engine.Load(db, lostBlobs);
-
- return Insert(engine, db, snap, std::move(dataToIndex), blobs, step);
-}
-
struct TExpected {
ui32 SrcPortions;
ui32 NewPortions;
@@ -327,15 +317,6 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
return engine.ApplyChanges(db, changes, snap);
}
-bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange,
- TString>&& blobs, ui32& step, const TExpected& expected) {
- TColumnEngineForLogs engine(0, TestLimits());
- engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
- THashSet<TUnifiedBlobId> lostBlobs;
- engine.Load(db, lostBlobs);
- return Compact(engine, db, snap, std::move(blobs), step, expected);
-}
-
bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, ui32 expectedToDrop) {
THashSet<ui64> pathsToDrop;
std::shared_ptr<TColumnEngineChanges> changes = engine.StartCleanup(snap, TestLimits(), pathsToDrop, 1000);
@@ -351,7 +332,7 @@ bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db,
UNIT_ASSERT(changes);
UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop);
- return engine.ApplyChanges(db, changes, changes->ApplySnapshot);
+ return engine.ApplyChanges(db, changes, TSnapshot(1,0));
}
std::shared_ptr<TPredicate> MakePredicate(int64_t ts, NArrow::EOperation op) {
@@ -389,9 +370,17 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
TInstant writeTime = TInstant::Now();
+
+ // load
+ TColumnEngineForLogs engine(0);
+ TSnapshot indexSnaphot(1, 1);
+ engine.UpdateDefaultSchema(indexSnaphot, TIndexInfo(tableInfo));
+ THashSet<TUnifiedBlobId> lostBlobs;
+ engine.Load(db, lostBlobs);
+
std::vector<TInsertedData> dataToIndex = {
- {1, 2, paths[0], "", blobRanges[0].BlobId, "", writeTime},
- {2, 1, paths[0], "", blobRanges[1].BlobId, "", writeTime}
+ {1, 2, paths[0], "", blobRanges[0].BlobId, "", writeTime, indexSnaphot},
+ {2, 1, paths[0], "", blobRanges[1].BlobId, "", writeTime, indexSnaphot}
};
// write
@@ -400,18 +389,13 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
THashMap<TBlobRange, TString> blobs;
blobs[blobRanges[0]] = testBlob;
blobs[blobRanges[1]] = testBlob;
- Insert(tableInfo, db, TSnapshot(1, 2), std::move(dataToIndex), blobs, step);
-
- // load
-
- TColumnEngineForLogs engine(0);
- engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
- THashSet<TUnifiedBlobId> lostBlobs;
- engine.Load(db, lostBlobs);
-
+ Insert(engine, db, TSnapshot(1, 2), std::move(dataToIndex), blobs, step);
+
// selects
- const TIndexInfo& indexInfo = engine.GetIndexInfo();
+ auto lastSchema = engine.GetVersionedIndex().GetLastSchema();
+ UNIT_ASSERT_EQUAL(lastSchema->GetSnapshot(), indexSnaphot);
+ const TIndexInfo& indexInfo = lastSchema->GetIndexInfo();
THashSet<ui32> oneColumnId = { indexInfo.GetColumnId(testColumns[0].first) };
THashSet<ui32> columnIds;
for (auto& [column, typeId] : testColumns) {
@@ -426,13 +410,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}
- { // select from snap after insert (greater txId)
+ { // select from snap between insert (greater txId)
ui64 planStep = 1;
ui64 txId = 2;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false));
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions[0].NumRecords(), columnIds.size());
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}
{ // select from snap after insert (greater planStep)
@@ -473,6 +456,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TTestDbWrapper db;
TIndexInfo tableInfo = TestTableInfo(ydbSchema, key);
+ TSnapshot indexSnapshot(1, 1);
+ TColumnEngineForLogs engine(0, TestLimits());
+ engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
+ THashSet<TUnifiedBlobId> lostBlobs;
+ engine.Load(db, lostBlobs);
+
ui64 pathId = 1;
ui32 step = 1000;
@@ -490,25 +479,18 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()});
+ TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot});
- bool ok = Insert(tableInfo, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
+ bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
}
// compact
planStep = 2;
- bool ok = Compact(tableInfo, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
+ bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
UNIT_ASSERT(ok);
- // load
-
- TColumnEngineForLogs engine(0, TestLimits());
- engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
- THashSet<TUnifiedBlobId> lostBlobs;
- engine.Load(db, lostBlobs);
-
// read
// TODO: read old snapshot
@@ -580,7 +562,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
ui64 planStep = 1;
TColumnEngineForLogs engine(0, TestLimits());
- engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
+ TSnapshot indexSnapshot(1, 1);
+ engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
THashSet<TUnifiedBlobId> lostBlobs;
engine.Load(db, lostBlobs);
@@ -596,7 +579,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()});
+ TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot});
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
// first overload returns ok: it's a postcondition
@@ -634,7 +617,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()});
+ TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot});
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
bool overload = engine.GetOverloadedGranules(pathId);
@@ -660,6 +643,12 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// insert
ui64 planStep = 1;
+ TSnapshot indexSnapshot(1, 1);
+ TColumnEngineForLogs engine(0, TestLimits());
+ engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
+ THashSet<TUnifiedBlobId> lostBlobs;
+ engine.Load(db, lostBlobs);
+
THashMap<TBlobRange, TString> blobs;
ui64 numRows = 1000;
ui64 rowPos = 0;
@@ -671,25 +660,18 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()});
+ TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot});
- bool ok = Insert(tableInfo, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
+ bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
}
// compact
planStep = 2;
- bool ok = Compact(tableInfo, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
+ bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
UNIT_ASSERT(ok);
- // load
-
- TColumnEngineForLogs engine(0, TestLimits());
- engine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
- THashSet<TUnifiedBlobId> lostBlobs;
- engine.Load(db, lostBlobs);
-
// read
planStep = 3;
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index f3bb215b8dc..99a5fa59bdb 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -65,13 +65,13 @@ public:
WaitIndexed.erase(event.BlobRange);
IndexedData.AddIndexed(event.BlobRange, event.Data);
} else if (CommittedBlobs.contains(blobId)) {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, NOlap::TSnapshot::Zero()});
+ auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId));
if (cmt.empty()) {
return; // ignore duplicates
}
const NOlap::TCommittedBlob& cmtBlob = cmt.key();
ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, event.Data, cmtBlob.GetSnapshot());
+ IndexedData.AddNotIndexed(batchNo, event.Data, cmtBlob);
} else {
LOG_S_ERROR("TEvReadBlobRangeResult returned unexpected blob at tablet "
<< TabletId << " (read)");
@@ -181,12 +181,12 @@ public:
// Add cached batches without read
for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
- auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, NOlap::TSnapshot::Zero()});
+ auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId));
Y_VERIFY(!cmt.empty());
const NOlap::TCommittedBlob& cmtBlob = cmt.key();
ui32 batchNo = cmt.mapped();
- IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.GetSnapshot());
+ IndexedData.AddNotIndexed(batchNo, batch, cmtBlob);
}
LOG_S_DEBUG("Starting read (" << WaitIndexed.size() << " indexed, "
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 12673e9bdbe..299020b6bba 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -2568,7 +2568,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
if (msg->IndexChanges->CompactionInfo) {
++compactionsHappened;
- Cerr << "Compaction at snaphsot "<< msg->IndexChanges->ApplySnapshot
+ Cerr << "Compaction at snaphsot "<< msg->IndexChanges->InitSnapshot
<< " old portions:";
ui64 srcGranule{0};
for (const auto& portionInfo : msg->IndexChanges->SwitchedPortions) {
@@ -2647,6 +2647,11 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
runtime.GetAppData().Icb->SetValue("ColumnShardControls.BlobCountToTriggerGC", 1, unusedPrev);
}
+ {
+ TAtomic unusedPrev;
+ runtime.GetAppData().Icb->SetValue("ColumnShardControls.MaxPortionsInGranule", 10, unusedPrev);
+ }
+
// Write different keys: grow on compaction
static const ui32 triggerPortionSize = 75 * 1000;