diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-02-24 15:29:04 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-24 15:29:04 +0300 |
commit | d552a8fa6623c4e1a0f4d1e10c6165449f63e4b4 (patch) | |
tree | d2d3b970a01d00ef933bbc9b55e0922b69cf454b | |
parent | e57c847491f07f4e397c63085c808486d3659404 (diff) | |
download | ydb-d552a8fa6623c4e1a0f4d1e10c6165449f63e4b4.tar.gz |
KIKIMR-14189 ColumnShard tiered recompaction (Cherry pick commit r9171943)
REVIEW: 2347385
x-ydb-stable-ref: 20fabfaafa4abff449cd20f5c445cc06bf3208c7
27 files changed, 912 insertions, 209 deletions
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index 908fc34a13..0b4b65aac4 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -1134,6 +1134,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false) .SetEnableOlapSchemaOperations(true); TKikimrRunner kikimr(settings); + static ui32 numKinds = 5; CreateTestOlapTable(kikimr); for (ui64 i = 0; i < 100; ++i) { @@ -1151,7 +1152,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 4*3); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), numKinds*3); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 3ull); UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[0].at("Kind")), 1ull); UNIT_ASSERT_GE(GetUint64(rows[0].at("TabletId")), 72075186224037888ull); @@ -1175,6 +1176,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false) .SetEnableOlapSchemaOperations(true); TKikimrRunner kikimr(settings); + static ui32 numKinds = 5; CreateTestOlapTable(kikimr, "olapTable_1"); CreateTestOlapTable(kikimr, "olapTable_2"); @@ -1195,9 +1197,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_GT(rows.size(), 1*4); - UNIT_ASSERT_LE(rows.size(), 3*4); - UNIT_ASSERT_VALUES_EQUAL(rows.size() % 4, 0); // 4 Kinds + UNIT_ASSERT_GT(rows.size(), 1*numKinds); + UNIT_ASSERT_LE(rows.size(), 3*numKinds); + UNIT_ASSERT_VALUES_EQUAL(rows.size() % numKinds, 0); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.front().at("PathId")), 3ull); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.back().at("PathId")), 3ull); } @@ -1210,9 +1212,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_GT(rows.size(), 1*4); - UNIT_ASSERT_LE(rows.size(), 3*4); - UNIT_ASSERT_VALUES_EQUAL(rows.size() % 4, 0); // 4 Kinds + UNIT_ASSERT_GT(rows.size(), 1*numKinds); + UNIT_ASSERT_LE(rows.size(), 3*numKinds); + UNIT_ASSERT_VALUES_EQUAL(rows.size() % numKinds, 0); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.front().at("PathId")), 4ull); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.back().at("PathId")), 4ull); } @@ -1236,6 +1238,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false) .SetEnableOlapSchemaOperations(true); TKikimrRunner kikimr(settings); + static ui32 numKinds = 5; CreateTestOlapTable(kikimr); for (ui64 i = 0; i < 10; ++i) { @@ -1271,7 +1274,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3*4); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3*numKinds); UNIT_ASSERT_LE(GetUint64(rows[0].at("Bytes")), GetUint64(rows[1].at("Bytes"))); } { @@ -1282,7 +1285,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { )"); auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3*4); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3*numKinds); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("Rows2")), GetUint64(rows[0].at("Rows3"))); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("Rows")), GetUint64(rows[1].at("Rows3"))); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[2].at("Rows")), GetUint64(rows[2].at("Rows2"))); @@ -1296,6 +1299,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false) .SetEnableOlapSchemaOperations(true); TKikimrRunner kikimr(settings); + static ui32 numKinds = 5; CreateTestOlapTable(kikimr, "olapTable_1"); CreateTestOlapTable(kikimr, "olapTable_2"); @@ -1344,11 +1348,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3*3*4); + ui32 numExpected = 3*3*numKinds; + UNIT_ASSERT_VALUES_EQUAL(rows.size(), numExpected); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 5ull); - UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[0].at("Kind")), 4ull); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[35].at("PathId")), 3ull); - UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[35].at("Kind")), 1ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[0].at("Kind")), numKinds); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected-1].at("PathId")), 3ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[numExpected-1].at("Kind")), 1ull); } { @@ -1366,11 +1371,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 2*3*4); + ui32 numExpected = 2*3*numKinds; + UNIT_ASSERT_VALUES_EQUAL(rows.size(), numExpected); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 5ull); - UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[0].at("Kind")), 4ull); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[23].at("PathId")), 3ull); - UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[23].at("Kind")), 1ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[0].at("Kind")), numKinds); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected-1].at("PathId")), 3ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[numExpected-1].at("Kind")), 1ull); } } @@ -1424,7 +1430,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { PRAGMA Kikimr.KqpPushOlapProcess = "true"; SELECT * FROM `/Root/olapStore/.sys/store_primary_index_stats` - WHERE Kind == UInt32("5") + WHERE Kind == UInt32("6") ORDER BY PathId, Kind, TabletId; )"); @@ -1444,7 +1450,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_GE(rows.size(), 3*2); + UNIT_ASSERT_GE(rows.size(), 3*3); } } @@ -1453,6 +1459,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false) .SetEnableOlapSchemaOperations(true); TKikimrRunner kikimr(settings); + static ui32 numKinds = 5; CreateTestOlapTable(kikimr, "olapTable_1"); CreateTestOlapTable(kikimr, "olapTable_2"); @@ -1589,7 +1596,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); // 3 Tables with 3 Shards each and 4 KindId-s of stats - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column0")), 3*3*4); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column0")), 3*3*numKinds); } { @@ -1603,7 +1610,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column0")), 3ull); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column1")), 4ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column1")), numKinds); UNIT_ASSERT_GE(GetUint64(rows[0].at("column2")), 3ull); } @@ -1619,7 +1626,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3ull); for (ui64 pathId = 3, row = 0; pathId <= 5; ++pathId, ++row) { UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[row].at("PathId")), pathId); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[row].at("column1")), 12); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[row].at("column1")), 3*numKinds); } } } diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto index ada7786f48..8c42878bab 100644 --- a/ydb/core/protos/counters_columnshard.proto +++ b/ydb/core/protos/counters_columnshard.proto @@ -43,6 +43,11 @@ enum ESimpleCounters { COUNTER_INACTIVE_BYTES = 33 [(CounterOpts) = {Name: "Index/InactiveBytes"}]; COUNTER_INACTIVE_RAW_BYTES = 34 [(CounterOpts) = {Name: "Index/InactiveBytesRaw"}]; COUNTER_SCAN_IN_FLY = 35 [(CounterOpts) = {Name: "ScanTxInFly"}]; + COUNTER_EVICTED_PORTIONS = 36 [(CounterOpts) = {Name: "Index/EvictedPortions"}]; + COUNTER_EVICTED_BLOBS = 37 [(CounterOpts) = {Name: "Index/EvictedBlobs"}]; + COUNTER_EVICTED_ROWS = 38 [(CounterOpts) = {Name: "Index/EvictedRows"}]; + COUNTER_EVICTED_BYTES = 39 [(CounterOpts) = {Name: "Index/EvictedBytes"}]; + COUNTER_EVICTED_RAW_BYTES = 40 [(CounterOpts) = {Name: "Index/EvictedBytesRaw"}]; } enum ECumulativeCounters { @@ -110,6 +115,9 @@ enum ECumulativeCounters { COUNTER_SMALL_BLOB_READ_BYTES = 61 [(CounterOpts) = {Name: "SmallBlobReadBytes"}]; COUNTER_SMALL_BLOB_DELETE_COUNT = 62 [(CounterOpts) = {Name: "SmallBlobDeleteCount"}]; COUNTER_SMALL_BLOB_DELETE_BYTES = 63 [(CounterOpts) = {Name: "SmallBlobDeleteBytes"}]; + COUNTER_EVICTION_PORTIONS_WRITTEN = 64 [(CounterOpts) = {Name: "EvictionPortionsWritten"}]; + COUNTER_EVICTION_BLOBS_WRITTEN = 65 [(CounterOpts) = {Name: "EvictionBlobsWritten"}]; + COUNTER_EVICTION_BYTES_WRITTEN = 66 [(CounterOpts) = {Name: "EvictionBytesWritten"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 019cd5ed67..06a1f67278 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -794,6 +794,7 @@ message TActivity { TX_COLUMNSHARD_READ_ACTOR = 497; TX_COLUMNSHARD_INDEXING_ACTOR = 498; TX_COLUMNSHARD_COMPACTION_ACTOR = 499; + TX_COLUMNSHARD_EVICTION_ACTOR = 505; SCHEME_BOARD_MONITORING_ACTOR = 478; SCHEME_BOARD_INFO_REQUESTER_ACTOR = 479; METERING_WRITER_ACTOR = 480; diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index d407b2cc41..f79c256a5b 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -231,7 +231,9 @@ message TIndexPortionMeta { bool IsInserted = 1; bool IsCompacted = 2; bool IsSplitCompacted = 3; + bool IsEvicted = 4; } + optional string TierName = 5; } message TIndexColumnMeta { diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index cc4017b5bc..fd0c913ca3 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -13,6 +13,7 @@ namespace NKikimr::NColumnShard { IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent); IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent); +IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent); IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max()); @@ -27,6 +28,7 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) ctx.Send(Tablet(), new TEvents::TEvPoisonPill); ctx.Send(IndexingActor, new TEvents::TEvPoisonPill); ctx.Send(CompactionActor, new TEvents::TEvPoisonPill); + ctx.Send(EvictionActor, new TEvents::TEvPoisonPill); } void TColumnShard::SwitchToWork(const TActorContext& ctx) { @@ -34,6 +36,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) { LOG_S_INFO("Switched to work at " << TabletID() << " actor " << ctx.SelfID); IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID)); CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID)); + EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID)); SignalTabletActive(ctx); } @@ -308,6 +311,12 @@ void TColumnShard::UpdateIndexCounters() { SetCounter(COUNTER_INACTIVE_ROWS, stats.Inactive.Rows); SetCounter(COUNTER_INACTIVE_BYTES, stats.Inactive.Bytes); SetCounter(COUNTER_INACTIVE_RAW_BYTES, stats.Inactive.RawBytes); + + SetCounter(COUNTER_EVICTED_PORTIONS, stats.Evicted.Portions); + SetCounter(COUNTER_EVICTED_BLOBS, stats.Evicted.Blobs); + SetCounter(COUNTER_EVICTED_ROWS, stats.Evicted.Rows); + SetCounter(COUNTER_EVICTED_BYTES, stats.Evicted.Bytes); + SetCounter(COUNTER_EVICTED_RAW_BYTES, stats.Evicted.RawBytes); } void TColumnShard::UpdateResourceMetrics(const TUsage& usage) { diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index a690aec811..1a5420361b 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -271,7 +271,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) Self->SetCounter(COUNTER_TABLE_TTLS, ttls.size()); if (!schemaPreset.empty()) { - Self->SetPrimaryIndex(std::move(schemaPreset), Self->Ttl.TtlColumns()); + Self->SetPrimaryIndex(std::move(schemaPreset)); } { // Load long tx writes diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index c8d8d450ff..352500107f 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -175,19 +175,16 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex } // If no paths trigger schema defined TTL - THashMap<ui64, NOlap::TTtlInfo> pathTtls; + THashMap<ui64, NOlap::TTiersInfo> pathTtls; if (!ttlBody.GetPathIds().empty()) { - ui64 unixTimeSec = ttlBody.GetUnixTimeSeconds(); - auto ts = std::make_shared<arrow::TimestampScalar>(unixTimeSec * 1000 * 1000, - arrow::timestamp(arrow::TimeUnit::MICRO)); - TString columnName = ttlBody.GetTtlColumnName(); - - if (!unixTimeSec || !ts->value) { + auto unixTime = TInstant::Seconds(ttlBody.GetUnixTimeSeconds()); + if (!unixTime) { statusMessage = "TTL tx wrong timestamp"; status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; break; } + TString columnName = ttlBody.GetTtlColumnName(); if (columnName.empty()) { statusMessage = "TTL tx wrong TTL column"; status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; @@ -195,12 +192,16 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex } for (ui64 pathId : ttlBody.GetPathIds()) { - pathTtls.emplace(pathId, NOlap::TTtlInfo{columnName, ts}); + pathTtls.emplace(pathId, NOlap::TTiersInfo(columnName, unixTime)); } } if (auto event = Self->SetupTtl(pathTtls, true)) { - ctx.Send(Self->SelfId(), event.release()); + if (event->NeedWrites()) { + ctx.Send(Self->EvictionActor, event.release()); + } else { + ctx.Send(Self->SelfId(), event->TxEvent.release()); + } status = NKikimrTxColumnShard::EResultStatus::SUCCESS; } diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h index 5400d8660d..770d683895 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.h +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h @@ -75,8 +75,8 @@ private: TMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>> IndexStats; - static constexpr const ui64 NUM_KINDS = 4; - static_assert(NUM_KINDS == NOlap::TPortionMeta::INACTIVE, "NUM_KINDS must match NOlap::TPortionMeta::EProduced enum"); + static constexpr const ui64 NUM_KINDS = 5; + static_assert(NUM_KINDS == NOlap::TPortionMeta::EVICTED, "NUM_KINDS must match NOlap::TPortionMeta::EProduced enum"); std::shared_ptr<arrow::RecordBatch> FillStatsBatch() { @@ -132,40 +132,51 @@ private: using TUInt64 = arrow::UInt64Type::c_type; using TUInt32 = arrow::UInt32Type::c_type; - TUInt64 pathIds[NUM_KINDS] = {pathId, pathId, pathId, pathId}; - /// It's better to keep it in sync with TPortionMeta::EProduced - TUInt32 kinds[NUM_KINDS] = {1, 2, 3, 4}; + TUInt64 pathIds[NUM_KINDS] = {pathId, pathId, pathId, pathId, pathId}; + /// It's in sync with TPortionMeta::EProduced + TUInt32 kinds[NUM_KINDS] = { + (ui32)NOlap::TPortionMeta::INSERTED, + (ui32)NOlap::TPortionMeta::COMPACTED, + (ui32)NOlap::TPortionMeta::SPLIT_COMPACTED, + (ui32)NOlap::TPortionMeta::INACTIVE, + (ui32)NOlap::TPortionMeta::EVICTED + }; ui64 tabletId = ReadMetadata->TabletId; - TUInt64 tabletIds[NUM_KINDS] = {tabletId, tabletId, tabletId, tabletId}; + TUInt64 tabletIds[NUM_KINDS] = {tabletId, tabletId, tabletId, tabletId, tabletId}; TUInt64 rows[NUM_KINDS] = { stats.Inserted.Rows, stats.Compacted.Rows, stats.SplitCompacted.Rows, - stats.Inactive.Rows + stats.Inactive.Rows, + stats.Evicted.Rows }; TUInt64 bytes[NUM_KINDS] = { stats.Inserted.Bytes, stats.Compacted.Bytes, stats.SplitCompacted.Bytes, - stats.Inactive.Bytes + stats.Inactive.Bytes, + stats.Evicted.Bytes }; TUInt64 rawBytes[NUM_KINDS] = { stats.Inserted.RawBytes, stats.Compacted.RawBytes, stats.SplitCompacted.RawBytes, - stats.Inactive.RawBytes + stats.Inactive.RawBytes, + stats.Evicted.RawBytes }; TUInt64 portions[NUM_KINDS] = { stats.Inserted.Portions, stats.Compacted.Portions, stats.SplitCompacted.Portions, - stats.Inactive.Portions + stats.Inactive.Portions, + stats.Evicted.Portions }; TUInt64 blobs[NUM_KINDS] = { stats.Inserted.Blobs, stats.Compacted.Blobs, stats.SplitCompacted.Blobs, - stats.Inactive.Blobs + stats.Inactive.Blobs, + stats.Evicted.Blobs }; if (Reverse) { diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 08f3982efc..7b6b67a26b 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -36,12 +36,12 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { << ") apply changes: " << *changes << " at tablet " << Self->TabletID()); TBlobManagerDb blobManagerDb(txc.DB); - for (const auto& cmtd : Ev->Get()->IndexChanges->DataToIndex) { + for (const auto& cmtd : changes->DataToIndex) { Self->InsertTable->EraseCommitted(dbWrap, cmtd); Self->BlobManager->DeleteBlob(cmtd.BlobId, blobManagerDb); } - const auto& switchedPortions = Ev->Get()->IndexChanges->SwitchedPortions; + const auto& switchedPortions = changes->SwitchedPortions; Self->IncCounter(COUNTER_PORTIONS_DEACTIVATED, switchedPortions.size()); THashSet<TUnifiedBlobId> blobsDeactivated; @@ -57,7 +57,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { Self->IncCounter(COUNTER_BYTES_DEACTIVATED, blobId.BlobSize()); } - for (auto& portionInfo : Ev->Get()->IndexChanges->AppendedPortions) { + for (auto& portionInfo : changes->AppendedPortions) { switch (portionInfo.Meta.Produced) { case NOlap::TPortionMeta::UNSPECIFIED: Y_VERIFY(false); // unexpected @@ -70,6 +70,9 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { case NOlap::TPortionMeta::SPLIT_COMPACTED: Self->IncCounter(COUNTER_SPLIT_COMPACTION_PORTIONS_WRITTEN); break; + case NOlap::TPortionMeta::EVICTED: + Y_FAIL("Unexpected evicted case"); + break; case NOlap::TPortionMeta::INACTIVE: Y_FAIL("Unexpected inactive case"); break; @@ -78,7 +81,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { // Put newly created blobs into cache if (Ev->Get()->CacheData) { for (const auto& columnRec : portionInfo.Records) { - const auto* blob = Ev->Get()->IndexChanges->Blobs.FindPtr(columnRec.BlobRange); + const auto* blob = changes->Blobs.FindPtr(columnRec.BlobRange); Y_VERIFY_DEBUG(blob, "Column data must be passed if CacheData is set"); if (blob) { Y_VERIFY(columnRec.BlobRange.Size == blob->Size()); @@ -88,7 +91,9 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { } } - const auto& portionsToDrop = Ev->Get()->IndexChanges->PortionsToDrop; + Self->IncCounter(COUNTER_EVICTION_PORTIONS_WRITTEN, changes->PortionsToEvict.size()); + + const auto& portionsToDrop = changes->PortionsToDrop; THashSet<TUnifiedBlobId> blobsToDrop; Self->IncCounter(COUNTER_PORTIONS_ERASED, portionsToDrop.size()); for (const auto& portionInfo : portionsToDrop) { @@ -98,6 +103,12 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { Self->IncCounter(COUNTER_RAW_BYTES_ERASED, portionInfo.RawBytesSum()); } + // Note: RAW_BYTES_ERASED and BYTES_ERASED counters are not in sync for evicted data + const auto& evictedRecords = changes->EvictedRecords; + for (const auto& rec : evictedRecords) { + blobsToDrop.insert(rec.BlobRange.BlobId); + } + Self->IncCounter(COUNTER_BLOBS_ERASED, blobsToDrop.size()); for (const auto& blobId : blobsToDrop) { Self->BlobManager->DeleteBlob(blobId, blobManagerDb); @@ -152,6 +163,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { Self->ActiveTtl = false; Self->IncCounter(ok ? COUNTER_TTL_SUCCESS : COUNTER_TTL_FAIL); + Self->IncCounter(COUNTER_EVICTION_BLOBS_WRITTEN, blobsWritten); + Self->IncCounter(COUNTER_EVICTION_BYTES_WRITTEN, bytesWritten); } Self->UpdateResourceMetrics(Ev->Get()->ResourceUsage); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 10dcea4d86..b1b866aa76 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -412,7 +412,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl schemaPreset.emplace(NOlap::TSnapshot{version.Step, version.TxId}, ConvertSchema(schemaPresetVerProto.GetSchema())); - SetPrimaryIndex(std::move(schemaPreset), Ttl.TtlColumns()); + SetPrimaryIndex(std::move(schemaPreset)); } tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj()); @@ -544,14 +544,13 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, #endif if (!schemaPreset.empty()) { - SetPrimaryIndex(std::move(schemaPreset), Ttl.TtlColumns()); + SetPrimaryIndex(std::move(schemaPreset)); } } -void TColumnShard::SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions, - const THashSet<TString>& ttlColumns) { +void TColumnShard::SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions) { for (auto& [snap, indexInfo] : schemaVersions) { - for (auto& columnName : ttlColumns) { + for (auto& columnName : Ttl.TtlColumns()) { indexInfo.AddTtlColumn(columnName); } @@ -587,7 +586,11 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) { } if (auto event = SetupTtl()) { - ctx.Send(SelfId(), event.release()); + if (event->NeedWrites()) { + ctx.Send(EvictionActor, event.release()); + } else { + ctx.Send(SelfId(), event->TxEvent.release()); + } } LastBackActivation = TInstant::Now(); @@ -698,8 +701,8 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev)); } -std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTtlInfo>& pathTtls, - bool force) { +std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiersInfo>& pathTtls, + bool force) { if (ActiveTtl) { LOG_S_DEBUG("Ttl already in progress at tablet " << TabletID()); return {}; @@ -709,9 +712,9 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupTtl(const THashMap return {}; } - THashMap<ui64, NOlap::TTtlInfo> regularTtls; + THashMap<ui64, NOlap::TTiersInfo> regularTtls; if (pathTtls.empty()) { - regularTtls = Ttl.MakeIndexTtlMap(force); + regularTtls = Ttl.MakeIndexTtlMap(TInstant::Now(), force); } if (pathTtls.empty() && regularTtls.empty()) { @@ -733,10 +736,11 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupTtl(const THashMap return {}; } + bool needWrites = !indexChanges->PortionsToEvict.empty(); + ActiveTtl = true; auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges, false); - ev->PutStatus = NKikimrProto::OK; // No blobs to write, start TTxWriteIndex in event handler - return ev; + return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), needWrites); } std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { @@ -792,6 +796,28 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { return ev; } +static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) { + NOlap::TCompression out; + if (compression.HasCompressionCodec()) { + switch (compression.GetCompressionCodec()) { + case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain: + out.Codec = arrow::Compression::UNCOMPRESSED; + break; + case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4: + out.Codec = arrow::Compression::LZ4_FRAME; + break; + case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD: + out.Codec = arrow::Compression::ZSTD; + break; + } + } + + if (compression.HasCompressionLevel()) { + out.Level = compression.GetCompressionLevel(); + } + return out; +} + NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) { Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); @@ -812,29 +838,17 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl } if (schema.HasDefaultCompression()) { - auto& compression = schema.GetDefaultCompression(); - if (compression.HasCompressionCodec()) { - arrow::Compression::type codec = arrow::Compression::LZ4_FRAME; - switch (compression.GetCompressionCodec()) { - case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain: - codec = arrow::Compression::UNCOMPRESSED; - break; - case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4: - codec = arrow::Compression::LZ4_FRAME; // TODO: should ColumnCodecLZ4 be mapped to LZ4 (row variant)? - break; - case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD: - codec = arrow::Compression::ZSTD; - break; - } - indexInfo.SetDefaultCompressionCodec(codec); - } + NOlap::TCompression compression = ConvertCompression(schema.GetDefaultCompression()); + indexInfo.SetDefaultCompression(compression); + } - if (compression.HasCompressionLevel()) { - int level = compression.GetCompressionLevel(); - indexInfo.SetDefaultCompressionLevel(level); - } else { - indexInfo.SetDefaultCompressionLevel(); // set default + for (auto& tierConfig : schema.GetStorageTiers()) { + NOlap::TStorageTier tier; + tier.Name = tierConfig.GetName(); + if (tierConfig.HasCompression()) { + tier.Compression = ConvertCompression(tierConfig.GetCompression()); } + indexInfo.AddStorageTier(std::move(tier)); } return indexInfo; diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 4c3c73fd63..b067efa3d7 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -45,6 +45,7 @@ class TColumnShard { friend class TIndexingActor; friend class TCompactionActor; + friend class TEvictionActor; friend class TTxInit; friend class TTxInitSchema; friend class TTxUpdateSchema; @@ -307,6 +308,7 @@ private: TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices. TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compation. + TActorId EvictionActor; std::unique_ptr<TTabletCountersBase> TabletCountersPtr; TTabletCountersBase* TabletCounters; std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache; @@ -374,14 +376,14 @@ private: void RunAlterTable(const NKikimrTxColumnShard::TAlterTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); - void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions, const THashSet<TString>& ttlColumns); + void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions); NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema); std::unique_ptr<TEvPrivate::TEvIndexing> SetupIndexation(); std::unique_ptr<TEvPrivate::TEvCompaction> SetupCompaction(); - std::unique_ptr<TEvPrivate::TEvWriteIndex> SetupTtl(const THashMap<ui64, NOlap::TTtlInfo>& pathTtls = {}, - bool force = false); + std::unique_ptr<TEvPrivate::TEvEviction> SetupTtl(const THashMap<ui64, NOlap::TTiersInfo>& pathTtls = {}, + bool force = false); std::unique_ptr<TEvPrivate::TEvWriteIndex> SetupCleanup(); void UpdateBlobMangerCounters(); diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h index 3dc52a0d8a..91802398ca 100644 --- a/ydb/core/tx/columnshard/columnshard_ttl.h +++ b/ydb/core/tx/columnshard/columnshard_ttl.h @@ -9,37 +9,52 @@ public: struct TEviction { TString TierName; - TString ColumnName; - TDuration ExpireAfter; + TDuration EvictAfter; }; struct TDescription { + TString ColumnName; std::vector<TEviction> Evictions; TDescription() = default; TDescription(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl) { + TDuration prevEvicSec; + if (ttl.HasEnabled()) { auto& enabled = ttl.GetEnabled(); - Evictions.emplace_back( - TEviction{{}, enabled.GetColumnName(), TDuration::Seconds(enabled.GetExpireAfterSeconds())}); + ColumnName = enabled.GetColumnName(); + auto expireSec = TDuration::Seconds(enabled.GetExpireAfterSeconds()); + + Evictions.reserve(1); + Evictions.emplace_back(TEviction{{}, expireSec}); } else if (ttl.HasTiering()) { + Evictions.reserve(ttl.GetTiering().TiersSize()); + for (auto& tier : ttl.GetTiering().GetTiers()) { auto& eviction = tier.GetEviction(); - Evictions.emplace_back(TEviction{tier.GetName(), - eviction.GetColumnName(), TDuration::Seconds(eviction.GetExpireAfterSeconds())}); + Y_VERIFY(ColumnName.empty() || ColumnName == eviction.GetColumnName()); + ColumnName = eviction.GetColumnName(); + auto evictSec = TDuration::Seconds(eviction.GetExpireAfterSeconds()); + + // Ignore next tier if it has smaller eviction time. Prefer first tier with same eviction time. + if (evictSec > prevEvicSec) { + Evictions.emplace_back(TEviction{tier.GetName(), evictSec}); + prevEvicSec = evictSec; + } } + + Evictions.shrink_to_fit(); + } + + if (Enabled()) { + Y_VERIFY(!ColumnName.empty()); } } bool Enabled() const { return !Evictions.empty(); } - - const TEviction& LastTier() const { - Y_VERIFY(!Evictions.empty()); - return Evictions.back(); - } }; ui64 PathsCount() const { @@ -48,10 +63,11 @@ public: void SetPathTtl(ui64 pathId, TDescription&& descr) { if (descr.Enabled()) { - for (auto& evict : descr.Evictions) { - if (!evict.ColumnName.empty()) { - Columns.insert(evict.ColumnName); - } + auto it = Columns.find(descr.ColumnName); + if (it != Columns.end()) { + descr.ColumnName = *it; // replace string dups (memory efficiency) + } else { + Columns.insert(descr.ColumnName); } PathTtls[pathId] = descr; } else { @@ -63,17 +79,17 @@ public: PathTtls.erase(pathId); } - THashMap<ui64, NOlap::TTtlInfo> MakeIndexTtlMap(bool force = false) { - if ((TInstant::Now() < LastRegularTtl + RegularTtlTimeout) && !force) { + THashMap<ui64, NOlap::TTiersInfo> MakeIndexTtlMap(TInstant now, bool force = false) { + if ((now < LastRegularTtl + RegularTtlTimeout) && !force) { return {}; } - THashMap<ui64, NOlap::TTtlInfo> out; + THashMap<ui64, NOlap::TTiersInfo> out; for (auto& [pathId, descr] : PathTtls) { - out.emplace(pathId, Convert(descr)); + out.emplace(pathId, Convert(descr, now)); } - LastRegularTtl = TInstant::Now(); + LastRegularTtl = now; return out; } @@ -85,13 +101,16 @@ private: TDuration RegularTtlTimeout{TDuration::Seconds(DEFAULT_TTL_TIMEOUT_SEC)}; TInstant LastRegularTtl; - NOlap::TTtlInfo Convert(const TDescription& descr) const { - auto& lastTier = descr.LastTier(); - ui64 border = (TInstant::Now() - lastTier.ExpireAfter).MicroSeconds(); - return NOlap::TTtlInfo{ - .Column = lastTier.ColumnName, - .Border = std::make_shared<arrow::TimestampScalar>(border, arrow::timestamp(arrow::TimeUnit::MICRO)) - }; + NOlap::TTiersInfo Convert(const TDescription& descr, TInstant timePoint) const { + Y_VERIFY(descr.Enabled()); + NOlap::TTiersInfo out(descr.ColumnName); + + for (auto& tier : descr.Evictions) { + auto border = timePoint - tier.EvictAfter; + out.AddTier(tier.TierName, border); + } + + return out; } }; diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h index b7267dd8ff..a5662a6876 100644 --- a/ydb/core/tx/columnshard/columnshard_txs.h +++ b/ydb/core/tx/columnshard/columnshard_txs.h @@ -23,6 +23,7 @@ struct TEvPrivate { EvScanStats, EvReadFinished, EvPeriodicWakeup, + EvEviction, EvEnd }; @@ -68,6 +69,22 @@ struct TEvPrivate { } }; + struct TEvEviction : public TEventLocal<TEvEviction, EvEviction> { + std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent; + + explicit TEvEviction(std::unique_ptr<TEvPrivate::TEvWriteIndex> txEvent, bool needWrites) + : TxEvent(std::move(txEvent)) + { + if (!needWrites) { + TxEvent->PutStatus = NKikimrProto::OK; + } + } + + bool NeedWrites() const { + return (TxEvent->PutStatus != NKikimrProto::OK); + } + }; + struct TEvScanStats : public TEventLocal<TEvScanStats, EvScanStats> { TEvScanStats(ui64 rows, ui64 bytes) : Rows(rows), Bytes(bytes) {} ui64 Rows; diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 139e83198a..4920b5c694 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -53,8 +53,11 @@ struct TTestSchema { return !Codec.empty(); } - TStorageTier& SetCodec(const TString& codec) { + TStorageTier& SetCodec(const TString& codec, std::optional<int> level = {}) { Codec = codec; + if (level) { + CompressionLevel = *level; + } return *this; } @@ -186,6 +189,7 @@ struct TTestSchema { auto* tiering = ttlSettings->MutableTiering(); for (auto& tier : specials.Tiers) { auto* t = tiering->AddTiers(); + t->SetName(tier.Name); t->MutableEviction()->SetColumnName(tier.TtlColumn); t->MutableEviction()->SetExpireAfterSeconds(*tier.EvictAfterSeconds); } @@ -215,6 +219,7 @@ struct TTestSchema { auto* tiering = ttlSettings->MutableTiering(); for (auto& tier : specials.Tiers) { auto* t = tiering->AddTiers(); + t->SetName(tier.Name); t->MutableEviction()->SetColumnName(tier.TtlColumn); t->MutableEviction()->SetExpireAfterSeconds(*tier.EvictAfterSeconds); } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index bad39f81ca..8fb6ac424a 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -57,9 +57,44 @@ struct TCompactionInfo { } }; -struct TTtlInfo { +struct TTiersInfo { + struct TTierTimeBorder { + TString TierName; + TInstant EvictBorder; + + TTierTimeBorder(TString tierName, TInstant evictBorder) + : TierName(tierName) + , EvictBorder(evictBorder) + {} + + std::shared_ptr<arrow::Scalar> ToTimestamp() const { + if (Scalar) { + return Scalar; + } + + Scalar = std::make_shared<arrow::TimestampScalar>( + EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO)); + return Scalar; + } + + private: + mutable std::shared_ptr<arrow::Scalar> Scalar; + }; + TString Column; - std::shared_ptr<arrow::Scalar> Border; + std::vector<TTierTimeBorder> TierBorders; // Ordered tiers from hottest to coldest + + TTiersInfo(const TString& column, TInstant border = {}, const TString& tierName = {}) + : Column(column) + { + if (border) { + AddTier(tierName, border); + } + } + + void AddTier(const TString& tierName, TInstant border) { + TierBorders.emplace_back(TTierTimeBorder(tierName, border)); + } }; class TColumnEngineChanges { @@ -92,6 +127,8 @@ public: TVector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones TVector<TPortionInfo> AppendedPortions; // New portions after indexing or compaction TVector<TPortionInfo> PortionsToDrop; + TVector<std::pair<TPortionInfo, TString>> PortionsToEvict; // {portion, target tier name} + TVector<TColumnRecord> EvictedRecords; TVector<std::pair<TPortionInfo, ui64>> PortionsToMove; // {portion, new granule} THashMap<TBlobRange, TString> Blobs; @@ -146,6 +183,13 @@ public: } out << "; "; } + if (ui32 evicted = changes.PortionsToEvict.size()) { + out << "evict " << evicted << " portions"; + for (auto& [portionInfo, tier] : changes.PortionsToEvict) { + out << portionInfo << " (to " << tier << ")"; + } + out << "; "; + } if (ui32 dropped = changes.PortionsToDrop.size()) { out << "drop " << dropped << " portions"; for (auto& portionInfo : changes.PortionsToDrop) { @@ -260,6 +304,7 @@ struct TColumnEngineStats { TPortionsStats Compacted{}; TPortionsStats SplitCompacted{}; TPortionsStats Inactive{}; + TPortionsStats Evicted{}; void Clear() { *this = {}; @@ -288,7 +333,7 @@ public: const TSnapshot& outdatedSnapshot) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop) = 0; - virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTtlInfo>& pathTtls) = 0; + virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) = 0; virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0; virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0; //virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 6fc16a5e3f..e1669b286d 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -15,12 +15,14 @@ std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TIndexInfo& inde namespace { -arrow::ipc::IpcWriteOptions WriteOptions(arrow::Compression::type codec, const std::optional<int>& level) { +arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) { + auto& codec = compression.Codec; + arrow::ipc::IpcWriteOptions options(arrow::ipc::IpcWriteOptions::Defaults()); Y_VERIFY(arrow::util::Codec::IsAvailable(codec)); arrow::Result<std::unique_ptr<arrow::util::Codec>> resCodec; - if (level) { - resCodec = arrow::util::Codec::Create(codec, *level); + if (compression.Level) { + resCodec = arrow::util::Codec::Create(codec, *compression.Level); if (!resCodec.ok()) { resCodec = arrow::util::Codec::Create(codec); } @@ -49,7 +51,9 @@ ui64 ExtractTimestamp(const std::shared_ptr<TPredicate>& pkPredicate, const std: // Although source batches are ordered only by PK (sorting key) resulting pathBatches are ordered by extended key. // They have const snapshot columns that do not break sorting inside batch. -std::shared_ptr<arrow::RecordBatch> AddSpecials(const TIndexInfo& indexInfo, const TInsertedData& inserted, const TString& data) { +std::shared_ptr<arrow::RecordBatch> AddSpecials(const TIndexInfo& indexInfo, const TInsertedData& inserted, + const TString& data) +{ auto schema = indexInfo.ArrowSchema(); Y_VERIFY(!data.empty(), "Blob data not present"); auto batch = NArrow::DeserializeBatch(data, schema); @@ -70,6 +74,50 @@ std::shared_ptr<arrow::RecordBatch> AddSpecials(const TIndexInfo& indexInfo, con return arrow::RecordBatch::Make(extendedSchema, numRows, columns); } +bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo, const TString& tierName, + const THashMap<TBlobRange, TString>& srcBlobs, + TVector<TColumnRecord>& evictedRecords, TVector<TString>& newBlobs) +{ + Y_VERIFY(portionInfo.TierName != tierName); + + auto compression = indexInfo.GetTierCompression(tierName); + if (!compression) { + // Noting to recompress. We have no other kinds of evictions yet. Return. + portionInfo.TierName = tierName; + return true; + } + + auto schema = indexInfo.ArrowSchemaWithSpecials(); + auto batch = portionInfo.AssembleInBatch(indexInfo, schema, srcBlobs); + auto writeOptions = WriteOptions(*compression); + + TPortionInfo undo = portionInfo; + size_t undoSize = newBlobs.size(); + + std::vector<TString> blobs; + for (auto& rec : portionInfo.Records) { + auto colName = indexInfo.GetColumnName(rec.ColumnId); + std::string name(colName.data(), colName.size()); + auto field = schema->GetFieldByName(name); + + auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(name), field, writeOptions); + if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { + portionInfo = undo; + newBlobs.resize(undoSize); + return false; + } + newBlobs.emplace_back(std::move(blob)); + rec.BlobRange = TBlobRange{}; + } + + for (auto& rec : undo.Records) { + evictedRecords.emplace_back(std::move(rec)); + } + + portionInfo.AddMetadata(indexInfo, batch, tierName); + return true; +} + TVector<TPortionInfo> MakeAppendedPortions(const TIndexInfo& indexInfo, std::shared_ptr<arrow::RecordBatch> batch, ui64 granule, @@ -79,7 +127,12 @@ TVector<TPortionInfo> MakeAppendedPortions(const TIndexInfo& indexInfo, auto schema = indexInfo.ArrowSchemaWithSpecials(); TVector<TPortionInfo> out; - auto writeOptions = WriteOptions(indexInfo.CompressionCodec(), indexInfo.CompressionLevel()); + TString tierName = indexInfo.GetTierName(0); + auto compression = indexInfo.GetTierCompression(0); + if (!compression) { + compression = indexInfo.GetDefaultCompression(); + } + auto writeOptions = WriteOptions(*compression); std::shared_ptr<arrow::RecordBatch> portionBatch = batch; for (i32 pos = 0; pos < batch->num_rows();) { @@ -111,7 +164,7 @@ TVector<TPortionInfo> MakeAppendedPortions(const TIndexInfo& indexInfo, } if (ok) { - portionInfo.AddMetadata(indexInfo, portionBatch); + portionInfo.AddMetadata(indexInfo, portionBatch, tierName); out.emplace_back(std::move(portionInfo)); for (auto& blob : portionBlobs) { blobs.push_back(blob); @@ -341,7 +394,8 @@ ui64 TColumnEngineForLogs::MemoryUsage() const { ui64 numPortions = Counters.Inserted.Portions + Counters.Compacted.Portions + Counters.SplitCompacted.Portions + - Counters.Inactive.Portions; + Counters.Inactive.Portions + + Counters.Evicted.Portions; return Counters.Granules * (sizeof(TGranuleMeta) + sizeof(ui64)) + numPortions * (sizeof(TPortionInfo) + sizeof(ui64)) + @@ -414,6 +468,9 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c case NOlap::TPortionMeta::INACTIVE: srcStats = &engineStats.Inactive; break; + case NOlap::TPortionMeta::EVICTED: + srcStats = &engineStats.Evicted; + break; } Y_VERIFY(srcStats); auto* stats = portionInfo.IsActive() ? srcStats : &engineStats.Inactive; @@ -699,7 +756,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T return changes; } -std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTtlInfo>& pathTtls) { +std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) { if (pathTtls.empty()) { return {}; } @@ -716,6 +773,8 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash continue; } + Y_VERIFY(!ttl.TierBorders.empty()); + ui32 ttlColumnId = IndexInfo.GetColumnId(ttl.Column); auto& tsGranule = PathGranules[pathId]; for (auto [ts, granule] : tsGranule) { @@ -723,8 +782,23 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash Y_VERIFY(spg); for (auto& [portion, info] : spg->Portions) { + if (!info.IsActive()) { + continue; + } + if (auto max = info.MaxValue(ttlColumnId)) { - if (NArrow::ScalarLess(*max, *ttl.Border)) { + bool keep = false; + for (auto& border : ttl.TierBorders) { + if (NArrow::ScalarLess(*border.ToTimestamp(), *max)) { + keep = true; + if (info.TierName != border.TierName) { + changes->PortionsToEvict.emplace_back(info, border.TierName); + } + break; + } + } + if (!keep) { + Y_VERIFY(!NArrow::ScalarLess(*ttl.TierBorders.back().ToTimestamp(), *max)); changes->PortionsToDrop.push_back(info); } } @@ -732,7 +806,8 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash } } - if (changes->PortionsToDrop.empty()) { + if (changes->PortionsToDrop.empty() && + changes->PortionsToEvict.empty()) { return {}; } @@ -815,17 +890,19 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE for (auto& portionInfo : changes->AppendedPortions) { portionInfo.UpdateRecords(++portion, granuleRemap, snapshot); - portionInfo.Meta.Produced = TPortionMeta::INSERTED; + TPortionMeta::EProduced produced = TPortionMeta::INSERTED; // If it's a split compaction with moves appended portions are INSERTED (could have overlaps with others) if (changes->IsCompaction() && changes->PortionsToMove.empty()) { Y_VERIFY(changes->CompactionInfo); - portionInfo.Meta.Produced = changes->CompactionInfo->InGranule ? + produced = changes->CompactionInfo->InGranule ? TPortionMeta::COMPACTED : TPortionMeta::SPLIT_COMPACTED; } - for (auto& record : portionInfo.Records) { - record.Metadata = portionInfo.GetMetadata(record); - } + portionInfo.UpdateRecordsMeta(produced); + } + + for (auto& [portionInfo, _] : changes->PortionsToEvict) { + portionInfo.UpdateRecordsMeta(TPortionMeta::EVICTED); } for (auto& [_, id] : changes->PortionsToMove) { @@ -940,7 +1017,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, ui64 granule = portionInfo.Granule(); if (!Granules.count(granule)) { - LOG_S_ERROR("Cannot update portions with unknown granule " << granule << " at tablet " << TabletId); + LOG_S_ERROR("Cannot update portion " << portionInfo << " with unknown granule at tablet " << TabletId); return false; } @@ -970,6 +1047,43 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, } } + // Update evicted portions + // There could be race between compaction and eviction. Allow compaction and disallow eviction in this case. + + for (auto& [info, _] : changes.PortionsToEvict) { + const auto& portionInfo = info; + Y_VERIFY(!portionInfo.Empty()); + Y_VERIFY(portionInfo.IsActive()); + + ui64 granule = portionInfo.Granule(); + ui64 portion = portionInfo.Portion(); + if (!Granules.count(granule) || !Granules[granule]->Portions.count(portion)) { + LOG_S_ERROR("Cannot evict unknown portion " << portionInfo << " at tablet " << TabletId); + return false; + } + + // In case of race with compaction portion could become inactive + // TODO: evict others instead of abort eviction + auto& oldInfo = Granules[granule]->Portions[portion]; + if (!oldInfo.IsActive()) { + LOG_S_WARN("Cannot evict inactive portion " << oldInfo << " at tablet " << TabletId); + return false; + } + Y_VERIFY(portionInfo.TierName != oldInfo.TierName); + + // TODO: update stats + if (!UpsertPortion(portionInfo, apply, false)) { + LOG_S_ERROR("Cannot evict portion " << portionInfo << " at tablet " << TabletId); + return false; + } + + if (apply) { + for (auto& record : portionInfo.Records) { + ColumnsTable->Write(db, record); + } + } + } + // Move portions in granules (zero-copy switch + append into new granules) for (auto& [info, granule] : changes.PortionsToMove) { @@ -1795,4 +1909,29 @@ TVector<TString> TColumnEngineForLogs::CompactBlobs(const TIndexInfo& indexInfo, return CompactSplitGranule(indexInfo, castedChanges); } +TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo, + std::shared_ptr<TColumnEngineChanges> changes) { + Y_VERIFY(changes); + Y_VERIFY(!changes->Blobs.empty()); // src data + Y_VERIFY(!changes->PortionsToEvict.empty()); // src meta + Y_VERIFY(changes->EvictedRecords.empty()); // dst meta + + TVector<TString> newBlobs; + TVector<std::pair<TPortionInfo, TString>> evicted; + evicted.reserve(changes->PortionsToEvict.size()); + + for (auto& [portionInfo, tierName] : changes->PortionsToEvict) { + Y_VERIFY(!portionInfo.Empty()); + Y_VERIFY(portionInfo.IsActive()); + + if (UpdateEvictedPortion(portionInfo, indexInfo, tierName, changes->Blobs, changes->EvictedRecords, newBlobs)) { + Y_VERIFY(portionInfo.TierName == tierName); + evicted.emplace_back(std::move(portionInfo), TString{}); + } + } + + changes->PortionsToEvict.swap(evicted); + return newBlobs; +} + } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index dca2e6afc1..a7faeb2bd2 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -141,7 +141,7 @@ public: const TSnapshot& outdatedSnapshot) override; std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop) override; - std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTtlInfo>& pathTtls) override; + std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) override; bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) override; void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override; @@ -164,6 +164,9 @@ public: /// @note called from CompactionActor static TVector<TString> CompactBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes); + /// @note called from EvictionActor + static TVector<TString> EvictBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes); + static ui64 ExtractKey(const TString& key) { Y_VERIFY(key.size() == 8); return *reinterpret_cast<const ui64*>(key.data()); diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 63cca9a6b9..20f2b3ea0b 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -54,6 +54,16 @@ GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const TVector<ui32> struct TInsertedData; +struct TCompression { + arrow::Compression::type Codec{arrow::Compression::LZ4_FRAME}; + std::optional<int> Level; +}; + +struct TStorageTier { + TString Name; + std::optional<TCompression> Compression; +}; + /// Column engine index description in terms of tablet's local table. /// We have to use YDB types for keys here. struct TIndexInfo : public NTable::TScheme::TTableSchema { @@ -189,10 +199,45 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { static bool IsSpecialColumn(const arrow::Field& field); static std::vector<std::shared_ptr<arrow::Array>> MakeSpecialColumns(const TInsertedData& blob, ui64 size); - arrow::Compression::type CompressionCodec() const { return DefaultCompressionCodec; } - void SetDefaultCompressionCodec(arrow::Compression::type codec) { DefaultCompressionCodec = codec; } - std::optional<int> CompressionLevel() const { return DefaultCompressionLevel; } - void SetDefaultCompressionLevel(const std::optional<int>& level = {}) { DefaultCompressionLevel = level; } + void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; } + const TCompression& GetDefaultCompression() const { return DefaultCompression; } + + std::optional<TCompression> GetTierCompression(ui32 tierNo) const { + if (!Tiers.empty()) { + Y_VERIFY(tierNo < Tiers.size()); + return Tiers[tierNo].Compression; + } + return {}; + } + + std::optional<TCompression> GetTierCompression(const TString& tierName) const { + if (tierName.empty()) { + return {}; + } + ui32 tierNo = GetTierNumber(tierName); + Y_VERIFY(tierNo != Max<ui32>()); + return GetTierCompression(tierNo); + } + + TString GetTierName(ui32 tierNo) const { + if (!Tiers.empty()) { + Y_VERIFY(tierNo < Tiers.size()); + return Tiers[tierNo].Name; + } + return {}; + } + + void AddStorageTier(TStorageTier&& tier) { + TierByName[tier.Name] = Tiers.size(); + Tiers.emplace_back(std::move(tier)); + } + + ui32 GetTierNumber(const TString& tierName) const { + if (auto it = TierByName.find(tierName); it != TierByName.end()) { + return it->second; + } + return Max<ui32>(); + } private: ui32 Id; @@ -205,8 +250,9 @@ private: std::shared_ptr<arrow::Schema> IndexKey; THashSet<TString> RequiredColumns; THashSet<ui32> MinMaxIdxColumnsIds; - arrow::Compression::type DefaultCompressionCodec{arrow::Compression::LZ4_FRAME}; - std::optional<int> DefaultCompressionLevel; + TCompression DefaultCompression; + std::vector<TStorageTier> Tiers; + THashMap<TString, ui32> TierByName; void AddRequiredColumns(const TVector<TString>& columns) { for (auto& name: columns) { diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index e67ab39605..15ae8968be 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -3,25 +3,20 @@ namespace NKikimr::NOlap { -namespace { - -TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, - const std::shared_ptr<arrow::Field>& field, - const arrow::ipc::IpcWriteOptions& writeOptions, int64_t& memSize) { - +TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array, + const std::shared_ptr<arrow::Field>& field, + const arrow::ipc::IpcWriteOptions& writeOptions) +{ std::vector<std::shared_ptr<arrow::Field>> tmp{field}; auto schema = std::make_shared<arrow::Schema>(tmp); auto batch = arrow::RecordBatch::Make(schema, array->length(), {array}); Y_VERIFY(batch); -#if 0 - auto status = GetRecordBatchSize(*batch, arrow::ipc::IpcWriteOptions::Defaults(), &memSize); - Y_VERIFY(status.ok()); -#else - Y_UNUSED(memSize); -#endif + return NArrow::SerializeBatch(batch, writeOptions); } +namespace { + std::shared_ptr<arrow::ChunkedArray> DeserializeBlobs(const TVector<TString>& blobs, std::shared_ptr<arrow::Field> field) { Y_VERIFY(!blobs.empty()); std::vector<std::shared_ptr<arrow::Field>> tmp{field}; @@ -46,8 +41,7 @@ TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& arr TColumnRecord&& record, const arrow::ipc::IpcWriteOptions& writeOptions, ui32 limitBytes) { - int64_t memSize = 0; - auto blob = SerializeColumn(array, field, writeOptions, memSize); + auto blob = SerializeColumn(array, field, writeOptions); if (blob.size() >= limitBytes) { return {}; } @@ -124,7 +118,11 @@ void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& Meta.ColumnMeta[columnId].Max = NArrow::GetScalar(column, minMaxPos.second); } -void TPortionInfo::AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch) { +void TPortionInfo::AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch, + const TString& tierName) { + TierName = tierName; + Meta = {}; + /// @note It does not add RawBytes info for snapshot columns, only for user ones. for (auto& [columnId, col] : indexInfo.Columns) { auto column = batch->GetColumnByName(col.Name); @@ -165,23 +163,32 @@ TString TPortionInfo::GetMetadata(const TColumnRecord& rec) const { } if (rec.ColumnId == FirstPkColumn) { + auto* portionMeta = meta.MutablePortionMeta(); + switch (Meta.Produced) { case TPortionMeta::UNSPECIFIED: Y_VERIFY(false); case TPortionMeta::INSERTED: - meta.MutablePortionMeta()->SetIsInserted(true); + portionMeta->SetIsInserted(true); break; case TPortionMeta::COMPACTED: - meta.MutablePortionMeta()->SetIsCompacted(true); + portionMeta->SetIsCompacted(true); break; case TPortionMeta::SPLIT_COMPACTED: - meta.MutablePortionMeta()->SetIsSplitCompacted(true); + portionMeta->SetIsSplitCompacted(true); + break; + case TPortionMeta::EVICTED: + portionMeta->SetIsEvicted(true); break; case TPortionMeta::INACTIVE: Y_FAIL("Unexpected inactive case"); - //meta.MutablePortionMeta()->SetInactive(true); + //portionMeta->SetInactive(true); break; } + + if (!TierName.empty()) { + portionMeta->SetTierName(TierName); + } } TString out; @@ -203,12 +210,16 @@ void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord if (meta.HasPortionMeta()) { auto& portionMeta = meta.GetPortionMeta(); + TierName = portionMeta.GetTierName(); + if (portionMeta.GetIsInserted()) { Meta.Produced = TPortionMeta::INSERTED; } else if (portionMeta.GetIsCompacted()) { Meta.Produced = TPortionMeta::COMPACTED; } else if (portionMeta.GetIsSplitCompacted()) { Meta.Produced = TPortionMeta::SPLIT_COMPACTED; + } else if (portionMeta.GetIsEvicted()) { + Meta.Produced = TPortionMeta::EVICTED; } } if (meta.HasNumRows()) { diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 884610037e..fa35d36530 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -12,7 +12,8 @@ struct TPortionMeta { INSERTED = 1, COMPACTED = 2, SPLIT_COMPACTED = 3, - INACTIVE = 4 + INACTIVE = 4, + EVICTED = 5, }; struct TColumnMeta { @@ -42,9 +43,12 @@ struct TPortionMeta { }; struct TPortionInfo { + static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024; + TVector<TColumnRecord> Records; TPortionMeta Meta; ui32 FirstPkColumn = 0; + TString TierName; bool Empty() const { return Records.empty(); } bool Valid() const { return !Empty() && Meta.Produced != TPortionMeta::UNSPECIFIED && HasMinMax(FirstPkColumn); } @@ -112,6 +116,13 @@ struct TPortionInfo { } } + void UpdateRecordsMeta(TPortionMeta::EProduced produced) { + Meta.Produced = produced; + for (auto& record : Records) { + record.Metadata = GetMetadata(record); + } + } + void SetStale(const TSnapshot& snapshot) { for (auto& rec : Records) { rec.SetXSnapshot(snapshot); @@ -125,7 +136,8 @@ struct TPortionInfo { TString GetMetadata(const TColumnRecord& rec) const; void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec); - void AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch); + void AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch, + const TString& tierName); void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted); std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const; std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const; @@ -176,11 +188,15 @@ struct TPortionInfo { const std::shared_ptr<arrow::Schema>& schema, const THashMap<TBlobRange, TString>& data) const; + static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, + const std::shared_ptr<arrow::Field>& field, + const arrow::ipc::IpcWriteOptions& writeOptions); + TString AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, TColumnRecord&& record, const arrow::ipc::IpcWriteOptions& writeOptions, - ui32 limitBytes = 8 * 1024 * 1024); + ui32 limitBytes = BLOB_BYTES_LIMIT); friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) { for (auto& rec : info.Records) { @@ -188,6 +204,9 @@ struct TPortionInfo { out << " (1 of " << info.Records.size() << " blobs shown)"; break; } + if (!info.TierName.empty()) { + out << " tier: " << info.TierName; + } out << " " << info.Meta; return out; } diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 7bdfb5b7bd..42f1544007 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -225,9 +225,8 @@ public: return batch; } - static NOlap::TTtlInfo MakeTtl(ui64 ts) { - auto scalar = std::make_shared<arrow::TimestampScalar>(ts, arrow::timestamp(arrow::TimeUnit::MICRO)); - return NOlap::TTtlInfo{testColumns[0].first, scalar}; + static NOlap::TTiersInfo MakeTtl(TInstant border) { + return NOlap::TTiersInfo(testColumns[0].first, border); } private: @@ -341,7 +340,7 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u } bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db, - const THashMap<ui64, NOlap::TTtlInfo>& pathTtls, ui32 expectedToDrop) { + const THashMap<ui64, NOlap::TTiersInfo>& pathTtls, ui32 expectedToDrop) { std::shared_ptr<TColumnEngineChanges> changes = engine.StartTtl(pathTtls); UNIT_ASSERT(changes); UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop); @@ -649,8 +648,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } // TTL - THashMap<ui64, NOlap::TTtlInfo> pathTtls; - pathTtls.emplace(pathId, TBuilder::MakeTtl(10000)); + THashMap<ui64, NOlap::TTiersInfo> pathTtls; + pathTtls.emplace(pathId, TBuilder::MakeTtl(TInstant::MicroSeconds(10000))); Ttl(engine, db, pathTtls, 2); // read + load + read diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp new file mode 100644 index 0000000000..da752e96d2 --- /dev/null +++ b/ydb/core/tx/columnshard/eviction_actor.cpp @@ -0,0 +1,151 @@ +#include "columnshard_impl.h" +#include <ydb/core/tx/columnshard/engines/column_engine_logs.h> +#include "blob_cache.h" + +namespace NKikimr::NColumnShard { + +using NOlap::TBlobRange; + +class TEvictionActor : public TActorBootstrapped<TEvictionActor> { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::TX_COLUMNSHARD_EVICTION_ACTOR; + } + + TEvictionActor(ui64 tabletId, const TActorId& parent) + : TabletId(tabletId) + , Parent(parent) + , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) + {} + + void Handle(TEvPrivate::TEvEviction::TPtr& ev, const TActorContext& ctx) { + auto& event = *ev->Get(); + TxEvent = std::move(event.TxEvent); + Y_VERIFY(TxEvent); + Y_VERIFY(Blobs.empty() && !NumRead); + + auto& indexChanges = TxEvent->IndexChanges; + Y_VERIFY(indexChanges); + + LOG_S_DEBUG("Portions eviction: " << *indexChanges << " at tablet " << TabletId); + + auto& evictedPortions = indexChanges->PortionsToEvict; + Y_VERIFY(evictedPortions.size()); + + for (auto& [portionInfo, tierName] : evictedPortions) { + Y_VERIFY(!portionInfo.Empty()); + std::vector<NBlobCache::TBlobRange> ranges; + for (auto& rec : portionInfo.Records) { + auto& blobRange = rec.BlobRange; + Blobs[blobRange] = {}; + // Group only ranges from the same blob into one request + if (!ranges.empty() && ranges.back().BlobId != blobRange.BlobId) { + SendReadRequest(ctx, std::move(ranges)); + ranges = {}; + } + ranges.push_back(blobRange); + } + SendReadRequest(ctx, std::move(ranges)); + } + } + + void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) { + LOG_S_TRACE("TEvReadBlobRangeResult (got " << NumRead << " of " << Blobs.size() + << ") at tablet " << TabletId << " (eviction)"); + + auto& event = *ev->Get(); + const TBlobRange& blobId = event.BlobRange; + Y_VERIFY(Blobs.count(blobId)); + if (!Blobs[blobId].empty()) { + return; + } + + if (event.Status == NKikimrProto::EReplyStatus::OK) { + Y_VERIFY(event.Data.size()); + + TString blobData = event.Data; + Y_VERIFY(blobData.size() == blobId.Size, "%u vs %u", (ui32)blobData.size(), blobId.Size); + Blobs[blobId] = blobData; + } else { + LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() << " status " << event.Status + << " at tablet " << TabletId << " (eviction)"); + TxEvent->PutStatus = event.Status; + if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) { + TxEvent->PutStatus = NKikimrProto::ERROR; + } + } + + ++NumRead; + if (NumRead == Blobs.size()) { + EvictPortions(ctx); + Clear(); + } + } + + void Bootstrap(const TActorContext& ctx) { + Y_UNUSED(ctx); + Become(&TThis::StateWait); + } + + STFUNC(StateWait) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvPrivate::TEvEviction, Handle); + HFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle); + default: + break; + } + } + +private: + ui64 TabletId; + TActorId Parent; + TActorId BlobCacheActorId; + std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent; + THashMap<TBlobRange, TString> Blobs; + ui32 NumRead{0}; + + void Clear() { + Blobs.clear(); + NumRead = 0; + } + + void SendReadRequest(const TActorContext&, std::vector<NBlobCache::TBlobRange>&& ranges) { + if (ranges.empty()) { + return; + } + + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false)); + } + + void EvictPortions(const TActorContext& ctx) { + Y_VERIFY(TxEvent); + if (TxEvent->PutStatus != NKikimrProto::EReplyStatus::UNKNOWN) { + LOG_S_INFO("Portions eviction not started at tablet " << TabletId); + ctx.Send(Parent, TxEvent.release()); + return; + } + + LOG_S_DEBUG("Portions eviction started at tablet " << TabletId); + { + TCpuGuard guard(TxEvent->ResourceUsage); + + TxEvent->IndexChanges->SetBlobs(std::move(Blobs)); + + TxEvent->Blobs = NOlap::TColumnEngineForLogs::EvictBlobs(TxEvent->IndexInfo, TxEvent->IndexChanges); + if (TxEvent->Blobs.empty()) { + TxEvent->PutStatus = NKikimrProto::OK; + } + } + ui32 blobsSize = TxEvent->Blobs.size(); + ctx.Send(Parent, TxEvent.release()); + + LOG_S_DEBUG("Portions eviction finished (" << blobsSize << " new blobs) at tablet " << TabletId); + //Die(ctx); // It's alive till tablet's death + } +}; + +IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent) { + return new TEvictionActor(tabletId, parent); +} + +} diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp index 00e3143fc6..e2723fafb9 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp @@ -1119,7 +1119,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { auto batchStats = scan->ArrowBatch; UNIT_ASSERT(batchStats); // Cerr << batchStats->ToString() << Endl; - UNIT_ASSERT_VALUES_EQUAL(batchStats->num_rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(batchStats->num_rows(), 5); for (ui32 i = 0; i < batchStats->num_rows(); ++i) { auto paths = batchStats->GetColumnByName("PathId"); diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index d2bb656e92..1511c3219b 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -42,18 +42,24 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS); } -bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize, - const std::string& columnName, i64 seconds) { +std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TString& blob, const TString& srtSchema, + const std::string& columnName) +{ auto schema = NArrow::DeserializeSchema(srtSchema); auto batch = NArrow::DeserializeBatch(blob, schema); UNIT_ASSERT(batch); + std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName); + UNIT_ASSERT(array); + return std::static_pointer_cast<arrow::TimestampArray>(array); +} + +bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize, + const std::string& columnName, i64 seconds) { auto expected = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO)); UNIT_ASSERT_VALUES_EQUAL(expected.value, seconds * 1000 * 1000); - std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName); - UNIT_ASSERT(array); - auto tsCol = std::static_pointer_cast<arrow::TimestampArray>(array); + auto tsCol = GetTimestampColumn(blob, srtSchema, columnName); UNIT_ASSERT(tsCol); UNIT_ASSERT_VALUES_EQUAL(tsCol->length(), expectedSize); @@ -67,7 +73,32 @@ bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize, return true; } -void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}) { +std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize) { + UNIT_ASSERT(ts.size() == 2); + + TString data1 = MakeTestBlob({0, portionSize}, testYdbSchema); + UNIT_ASSERT(data1.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); + UNIT_ASSERT(data1.size() < 7 * 1024 * 1024); + + TString data2 = MakeTestBlob({overlapSize, overlapSize + portionSize}, testYdbSchema); + UNIT_ASSERT(data2.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); + UNIT_ASSERT(data2.size() < 7 * 1024 * 1024); + + auto schema = NArrow::MakeArrowSchema(testYdbSchema); + auto batch1 = UpdateColumn(NArrow::DeserializeBatch(data1, schema), TTestSchema::DefaultTtlColumn, ts[0]); + auto batch2 = UpdateColumn(NArrow::DeserializeBatch(data2, schema), TTestSchema::DefaultTtlColumn, ts[1]); + + std::vector<TString> data; + data.emplace_back(NArrow::SerializeBatchNoCompression(batch1)); + data.emplace_back(NArrow::SerializeBatchNoCompression(batch2)); + return data; +} + +// ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020 +// ts[1] = 1620000000; // date -u --date='@1620000000' Mon May 3 00:00:00 UTC 2021 +void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, + std::vector<ui64> ts = {1600000000, 1620000000}) +{ TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -88,12 +119,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}) ui64 planStep = 1000000000; // greater then delays ui64 txId = 100; - ui64 ts1 = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020 - ui64 ts2 = 1620000000; // date -u --date='@1620000000' Mon May 3 00:00:00 UTC 2021 + UNIT_ASSERT(ts.size() == 2); ui32 ttlSec = TInstant::Now().Seconds(); // disable internal tll if (internal) { - ttlSec -= (ts1 + ts2) / 2; // enable internal ttl between ts1 and ts2 + ttlSec -= (ts[0] + ts[1]) / 2; // enable internal ttl between ts1 and ts2 } if (spec.HasTiers()) { spec.Tiers[0].SetTtl(ttlSec); @@ -108,31 +138,14 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}) // - static const ui32 portionSize = 80 * 1000; - static const ui32 overlapSize = 40 * 1000; - - TString data1 = MakeTestBlob({0, portionSize}, testYdbSchema); - UNIT_ASSERT(data1.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); - UNIT_ASSERT(data1.size() < 7 * 1024 * 1024); - - TString data2 = MakeTestBlob({overlapSize, overlapSize + portionSize}, testYdbSchema); - UNIT_ASSERT(data2.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); - UNIT_ASSERT(data2.size() < 7 * 1024 * 1024); - - auto schema = NArrow::MakeArrowSchema(testYdbSchema); - auto batch1 = NArrow::DeserializeBatch(data1, schema); - auto batch2 = NArrow::DeserializeBatch(data2, schema); - - data1 = NArrow::SerializeBatchNoCompression(UpdateColumn(batch1, TTestSchema::DefaultTtlColumn, ts1)); - data2 = NArrow::SerializeBatchNoCompression(UpdateColumn(batch2, TTestSchema::DefaultTtlColumn, ts2)); - - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data1)); - ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); - PlanCommit(runtime, sender, ++planStep, txId); - - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data2)); - ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); - PlanCommit(runtime, sender, ++planStep, txId); + ui32 portionSize = 80 * 1000; + auto blobs = MakeData(ts, portionSize, portionSize / 2); + UNIT_ASSERT_EQUAL(blobs.size(), 2); + for (auto& data : blobs) { + UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data)); + ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); + PlanCommit(runtime, sender, ++planStep, txId); + } // TODO: write into path 2 (no ttl) @@ -143,7 +156,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}) if (internal) { TriggerTTL(runtime, sender, {++planStep, ++txId}, {}); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts1 + 1); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1); } TAutoPtr<IEventHandle> handle; @@ -170,11 +183,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}) UNIT_ASSERT(resRead.GetData().size() > 0); auto& schema = resRead.GetMeta().GetSchema(); - UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts2)); + UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts[1])); } // Alter TTL - ttlSec = TInstant::Now().Seconds() - (ts2 + 1); + ttlSec = TInstant::Now().Seconds() - (ts[1] + 1); if (spec.HasTiers()) { spec.Tiers[0].SetTtl(ttlSec); } else { @@ -189,7 +202,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}) if (internal) { TriggerTTL(runtime, sender, {++planStep, ++txId}, {}); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts2 + 1); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1); } { @@ -217,14 +230,14 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}) UNIT_ASSERT(ok); PlanSchemaTx(runtime, sender, {planStep, txId}); - UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data1)); + UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, blobs[0])); ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); PlanCommit(runtime, sender, ++planStep, txId); if (internal) { TriggerTTL(runtime, sender, {++planStep, ++txId}, {}); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts1 - 1); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1); } { @@ -245,8 +258,151 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}) UNIT_ASSERT(resRead.GetData().size() > 0); auto& schema = resRead.GetMeta().GetSchema(); - UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts1)); + UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts[0])); + } +} + +std::vector<std::pair<std::shared_ptr<arrow::TimestampArray>, ui64>> +TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTestSchema::TTableSpecials>& specs) { + TTestBasicRuntime runtime; + TTester::Setup(runtime); + + TActorId sender = runtime.AllocateEdgeActor(); + CreateTestBootstrapper(runtime, + CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::COLUMNSHARD), + &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + // + + ui64 metaShard = TTestTxConfig::TxTablet1; + ui64 writeId = 0; + ui64 tableId = 1; + ui64 planStep = 1000000000; // greater then delays + ui64 txId = 100; + + UNIT_ASSERT(specs.size() > 0); + bool ok = ProposeSchemaTx(runtime, sender, + TTestSchema::CreateTableTxBody(tableId, testYdbSchema, specs[0]), + {++planStep, ++txId}); + UNIT_ASSERT(ok); + PlanSchemaTx(runtime, sender, {planStep, txId}); + + for (auto& data : blobs) { + UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data)); + ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); + PlanCommit(runtime, sender, ++planStep, txId); + } + + if (reboots) { + RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); + } + + TAutoPtr<IEventHandle> handle; + + std::vector<std::pair<std::shared_ptr<arrow::TimestampArray>, ui64>> resColumns; + resColumns.reserve(specs.size()); + + for (ui32 i = 0; i < specs.size(); ++i) { + if (i) { + ui32 version = i + 1; + ok = ProposeSchemaTx(runtime, sender, + TTestSchema::AlterTableTxBody(tableId, version, specs[i]), + {++planStep, ++txId}); + UNIT_ASSERT(ok); + PlanSchemaTx(runtime, sender, {planStep, txId}); + } + + TriggerTTL(runtime, sender, {++planStep, ++txId}, {}); + + // Read + + --planStep; + auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); + Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn); + + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); + UNIT_ASSERT(event); + + auto& resRead = Proto(event); + UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); + UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); + UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); + UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); + + if (resRead.GetData().size()) { + auto& meta = resRead.GetMeta(); + auto& schema = meta.GetSchema(); + auto tsColumn = GetTimestampColumn(resRead.GetData(), schema, TTestSchema::DefaultTtlColumn); + UNIT_ASSERT(tsColumn); + + UNIT_ASSERT(meta.HasReadStats()); + auto& readStats = meta.GetReadStats(); + ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage + + resColumns.emplace_back(tsColumn, numBytes); + } else { + resColumns.emplace_back(nullptr, 0); + } } + + return resColumns; +} + +void TestTiersT1(bool reboots) { + std::vector<ui64> ts = {1600000000, 1620000000}; + ui64 nowSec = TInstant::Now().Seconds(); + + TTestSchema::TTableSpecials spec; + spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0")); + spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1")); + spec.Tiers.back().SetCodec("zstd"); + + std::vector<TTestSchema::TTableSpecials> alters(4, spec); + + ui64 allowBoth = nowSec - ts[0] + 600; + ui64 allowOne = nowSec - ts[1] + 600; + ui64 allowNone = nowSec - ts[1] - 600; + + alters[0].Tiers[0].SetTtl(allowBoth); // tier0 allows/has: data[0], data[1] + alters[0].Tiers[1].SetTtl(allowBoth); // tier1 allows: data[0], data[1], has: nothing + + alters[1].Tiers[0].SetTtl(allowOne); // tier0 allows/has: data[1] + alters[1].Tiers[1].SetTtl(allowBoth); // tier1 allows: data[0], data[1], has: data[0] + + alters[2].Tiers[0].SetTtl(allowNone); // tier0 allows/has: nothing + alters[2].Tiers[1].SetTtl(allowOne); // tier1 allows/has: data[1] + + alters[3].Tiers[0].SetTtl(allowNone); // tier0 allows/has: nothing + alters[3].Tiers[1].SetTtl(allowNone); // tier1 allows/has: nothing + + ui32 portionSize = 80 * 1000; + ui32 overlapSize = 40 * 1000; + std::vector<TString> blobs = MakeData(ts, portionSize, overlapSize); + + auto columns = TestTiers(reboots, blobs, alters); + + UNIT_ASSERT_EQUAL(columns.size(), 4); + UNIT_ASSERT(columns[0].first); + UNIT_ASSERT(columns[1].first); + UNIT_ASSERT(columns[2].first); + UNIT_ASSERT(!columns[3].first); + UNIT_ASSERT(columns[0].second); + UNIT_ASSERT(columns[1].second); + UNIT_ASSERT(columns[2].second); + UNIT_ASSERT(!columns[3].second); + + UNIT_ASSERT_EQUAL(columns[0].first->length(), 2 * portionSize - overlapSize); + UNIT_ASSERT_EQUAL(columns[0].first->length(), columns[1].first->length()); + UNIT_ASSERT_EQUAL(columns[2].first->length(), portionSize); + + Cerr << "read bytes: " << columns[0].second << ", " << columns[1].second << ", " << columns[2].second << "\n"; + UNIT_ASSERT_GT(columns[0].second, columns[1].second); } void TestDrop(bool reboots) { @@ -380,6 +536,15 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { TestTtl(true, false, specs); } + Y_UNIT_TEST(Tiers) { + TestTiersT1(false); + } + + Y_UNIT_TEST(RebootTiers) { + NColumnShard::gAllowLogBatchingDefaultValue = false; + TestTiersT1(true); + } + Y_UNIT_TEST(Drop) { TestDrop(false); } diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index a02d84e73f..1a96ba7b44 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -213,7 +213,17 @@ public: TString accumulatedBlob; TVector<std::pair<size_t, TString>> recordsInBlob; - for (auto& portionInfo : indexChanges->AppendedPortions) { + size_t portionsToWrite = indexChanges->AppendedPortions.size(); + bool appended = true; + if (indexChanges->PortionsToEvict.size()) { + Y_VERIFY(portionsToWrite == 0); + portionsToWrite = indexChanges->PortionsToEvict.size(); + appended = false; + } + + for (size_t pos = 0; pos < portionsToWrite; ++pos) { + auto& portionInfo = appended ? indexChanges->AppendedPortions[pos] + : indexChanges->PortionsToEvict[pos].first; auto& records = portionInfo.Records; accumulatedBlob.clear(); diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index c4c857fe7d..e1a38023b3 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -23,6 +23,7 @@ SRCS( columnshard_impl.cpp columnshard_common.cpp compaction_actor.cpp + eviction_actor.cpp indexing_actor.cpp read_actor.cpp write_actor.cpp diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp index b3aea0b90f..2a712d3b0f 100644 --- a/ydb/core/tx/schemeshard/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap.cpp @@ -538,4 +538,9 @@ Y_UNIT_TEST_SUITE(TOlap) { } )", {NKikimrScheme::StatusInvalidParameter}); } + + // TODO: AlterTiers + // negatives for store: disallow alters + // negatives for table: wrong tiers count, wrong tiers, wrong eviction column, wrong eviction values, + // different TTL columns in tiers } |