aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-02-24 15:29:04 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-24 15:29:04 +0300
commitd552a8fa6623c4e1a0f4d1e10c6165449f63e4b4 (patch)
treed2d3b970a01d00ef933bbc9b55e0922b69cf454b
parente57c847491f07f4e397c63085c808486d3659404 (diff)
downloadydb-d552a8fa6623c4e1a0f4d1e10c6165449f63e4b4.tar.gz
KIKIMR-14189 ColumnShard tiered recompaction (Cherry pick commit r9171943)
REVIEW: 2347385 x-ydb-stable-ref: 20fabfaafa4abff449cd20f5c445cc06bf3208c7
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp51
-rw-r--r--ydb/core/protos/counters_columnshard.proto8
-rw-r--r--ydb/core/protos/services.proto1
-rw-r--r--ydb/core/protos/tx_columnshard.proto2
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp9
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp19
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.h33
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp23
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp80
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h8
-rw-r--r--ydb/core/tx/columnshard/columnshard_ttl.h73
-rw-r--r--ydb/core/tx/columnshard/columnshard_txs.h17
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h7
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h51
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp171
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h5
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h58
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp49
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h25
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp11
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp151
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_read_write.cpp2
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp247
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp12
-rw-r--r--ydb/core/tx/columnshard/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/ut_olap.cpp5
27 files changed, 912 insertions, 209 deletions
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index 908fc34a13..0b4b65aac4 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -1134,6 +1134,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false)
.SetEnableOlapSchemaOperations(true);
TKikimrRunner kikimr(settings);
+ static ui32 numKinds = 5;
CreateTestOlapTable(kikimr);
for (ui64 i = 0; i < 100; ++i) {
@@ -1151,7 +1152,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_VALUES_EQUAL(rows.size(), 4*3);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), numKinds*3);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 3ull);
UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[0].at("Kind")), 1ull);
UNIT_ASSERT_GE(GetUint64(rows[0].at("TabletId")), 72075186224037888ull);
@@ -1175,6 +1176,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false)
.SetEnableOlapSchemaOperations(true);
TKikimrRunner kikimr(settings);
+ static ui32 numKinds = 5;
CreateTestOlapTable(kikimr, "olapTable_1");
CreateTestOlapTable(kikimr, "olapTable_2");
@@ -1195,9 +1197,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_GT(rows.size(), 1*4);
- UNIT_ASSERT_LE(rows.size(), 3*4);
- UNIT_ASSERT_VALUES_EQUAL(rows.size() % 4, 0); // 4 Kinds
+ UNIT_ASSERT_GT(rows.size(), 1*numKinds);
+ UNIT_ASSERT_LE(rows.size(), 3*numKinds);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size() % numKinds, 0);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.front().at("PathId")), 3ull);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.back().at("PathId")), 3ull);
}
@@ -1210,9 +1212,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_GT(rows.size(), 1*4);
- UNIT_ASSERT_LE(rows.size(), 3*4);
- UNIT_ASSERT_VALUES_EQUAL(rows.size() % 4, 0); // 4 Kinds
+ UNIT_ASSERT_GT(rows.size(), 1*numKinds);
+ UNIT_ASSERT_LE(rows.size(), 3*numKinds);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size() % numKinds, 0);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.front().at("PathId")), 4ull);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.back().at("PathId")), 4ull);
}
@@ -1236,6 +1238,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false)
.SetEnableOlapSchemaOperations(true);
TKikimrRunner kikimr(settings);
+ static ui32 numKinds = 5;
CreateTestOlapTable(kikimr);
for (ui64 i = 0; i < 10; ++i) {
@@ -1271,7 +1274,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3*4);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3*numKinds);
UNIT_ASSERT_LE(GetUint64(rows[0].at("Bytes")), GetUint64(rows[1].at("Bytes")));
}
{
@@ -1282,7 +1285,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3*4);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3*numKinds);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("Rows2")), GetUint64(rows[0].at("Rows3")));
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("Rows")), GetUint64(rows[1].at("Rows3")));
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[2].at("Rows")), GetUint64(rows[2].at("Rows2")));
@@ -1296,6 +1299,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false)
.SetEnableOlapSchemaOperations(true);
TKikimrRunner kikimr(settings);
+ static ui32 numKinds = 5;
CreateTestOlapTable(kikimr, "olapTable_1");
CreateTestOlapTable(kikimr, "olapTable_2");
@@ -1344,11 +1348,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3*3*4);
+ ui32 numExpected = 3*3*numKinds;
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), numExpected);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 5ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[0].at("Kind")), 4ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[35].at("PathId")), 3ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[35].at("Kind")), 1ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[0].at("Kind")), numKinds);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected-1].at("PathId")), 3ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[numExpected-1].at("Kind")), 1ull);
}
{
@@ -1366,11 +1371,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_VALUES_EQUAL(rows.size(), 2*3*4);
+ ui32 numExpected = 2*3*numKinds;
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), numExpected);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 5ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[0].at("Kind")), 4ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[23].at("PathId")), 3ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[23].at("Kind")), 1ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[0].at("Kind")), numKinds);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected-1].at("PathId")), 3ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint32(rows[numExpected-1].at("Kind")), 1ull);
}
}
@@ -1424,7 +1430,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
PRAGMA Kikimr.KqpPushOlapProcess = "true";
SELECT *
FROM `/Root/olapStore/.sys/store_primary_index_stats`
- WHERE Kind == UInt32("5")
+ WHERE Kind == UInt32("6")
ORDER BY PathId, Kind, TabletId;
)");
@@ -1444,7 +1450,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_GE(rows.size(), 3*2);
+ UNIT_ASSERT_GE(rows.size(), 3*3);
}
}
@@ -1453,6 +1459,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetWithSampleTables(false)
.SetEnableOlapSchemaOperations(true);
TKikimrRunner kikimr(settings);
+ static ui32 numKinds = 5;
CreateTestOlapTable(kikimr, "olapTable_1");
CreateTestOlapTable(kikimr, "olapTable_2");
@@ -1589,7 +1596,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
// 3 Tables with 3 Shards each and 4 KindId-s of stats
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column0")), 3*3*4);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column0")), 3*3*numKinds);
}
{
@@ -1603,7 +1610,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column0")), 3ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column1")), 4ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column1")), numKinds);
UNIT_ASSERT_GE(GetUint64(rows[0].at("column2")), 3ull);
}
@@ -1619,7 +1626,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3ull);
for (ui64 pathId = 3, row = 0; pathId <= 5; ++pathId, ++row) {
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[row].at("PathId")), pathId);
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[row].at("column1")), 12);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[row].at("column1")), 3*numKinds);
}
}
}
diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto
index ada7786f48..8c42878bab 100644
--- a/ydb/core/protos/counters_columnshard.proto
+++ b/ydb/core/protos/counters_columnshard.proto
@@ -43,6 +43,11 @@ enum ESimpleCounters {
COUNTER_INACTIVE_BYTES = 33 [(CounterOpts) = {Name: "Index/InactiveBytes"}];
COUNTER_INACTIVE_RAW_BYTES = 34 [(CounterOpts) = {Name: "Index/InactiveBytesRaw"}];
COUNTER_SCAN_IN_FLY = 35 [(CounterOpts) = {Name: "ScanTxInFly"}];
+ COUNTER_EVICTED_PORTIONS = 36 [(CounterOpts) = {Name: "Index/EvictedPortions"}];
+ COUNTER_EVICTED_BLOBS = 37 [(CounterOpts) = {Name: "Index/EvictedBlobs"}];
+ COUNTER_EVICTED_ROWS = 38 [(CounterOpts) = {Name: "Index/EvictedRows"}];
+ COUNTER_EVICTED_BYTES = 39 [(CounterOpts) = {Name: "Index/EvictedBytes"}];
+ COUNTER_EVICTED_RAW_BYTES = 40 [(CounterOpts) = {Name: "Index/EvictedBytesRaw"}];
}
enum ECumulativeCounters {
@@ -110,6 +115,9 @@ enum ECumulativeCounters {
COUNTER_SMALL_BLOB_READ_BYTES = 61 [(CounterOpts) = {Name: "SmallBlobReadBytes"}];
COUNTER_SMALL_BLOB_DELETE_COUNT = 62 [(CounterOpts) = {Name: "SmallBlobDeleteCount"}];
COUNTER_SMALL_BLOB_DELETE_BYTES = 63 [(CounterOpts) = {Name: "SmallBlobDeleteBytes"}];
+ COUNTER_EVICTION_PORTIONS_WRITTEN = 64 [(CounterOpts) = {Name: "EvictionPortionsWritten"}];
+ COUNTER_EVICTION_BLOBS_WRITTEN = 65 [(CounterOpts) = {Name: "EvictionBlobsWritten"}];
+ COUNTER_EVICTION_BYTES_WRITTEN = 66 [(CounterOpts) = {Name: "EvictionBytesWritten"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 019cd5ed67..06a1f67278 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -794,6 +794,7 @@ message TActivity {
TX_COLUMNSHARD_READ_ACTOR = 497;
TX_COLUMNSHARD_INDEXING_ACTOR = 498;
TX_COLUMNSHARD_COMPACTION_ACTOR = 499;
+ TX_COLUMNSHARD_EVICTION_ACTOR = 505;
SCHEME_BOARD_MONITORING_ACTOR = 478;
SCHEME_BOARD_INFO_REQUESTER_ACTOR = 479;
METERING_WRITER_ACTOR = 480;
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index d407b2cc41..f79c256a5b 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -231,7 +231,9 @@ message TIndexPortionMeta {
bool IsInserted = 1;
bool IsCompacted = 2;
bool IsSplitCompacted = 3;
+ bool IsEvicted = 4;
}
+ optional string TierName = 5;
}
message TIndexColumnMeta {
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index cc4017b5bc..fd0c913ca3 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -13,6 +13,7 @@ namespace NKikimr::NColumnShard {
IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent);
IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent);
+IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent);
IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max());
@@ -27,6 +28,7 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx)
ctx.Send(Tablet(), new TEvents::TEvPoisonPill);
ctx.Send(IndexingActor, new TEvents::TEvPoisonPill);
ctx.Send(CompactionActor, new TEvents::TEvPoisonPill);
+ ctx.Send(EvictionActor, new TEvents::TEvPoisonPill);
}
void TColumnShard::SwitchToWork(const TActorContext& ctx) {
@@ -34,6 +36,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
LOG_S_INFO("Switched to work at " << TabletID() << " actor " << ctx.SelfID);
IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID));
CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID));
+ EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID));
SignalTabletActive(ctx);
}
@@ -308,6 +311,12 @@ void TColumnShard::UpdateIndexCounters() {
SetCounter(COUNTER_INACTIVE_ROWS, stats.Inactive.Rows);
SetCounter(COUNTER_INACTIVE_BYTES, stats.Inactive.Bytes);
SetCounter(COUNTER_INACTIVE_RAW_BYTES, stats.Inactive.RawBytes);
+
+ SetCounter(COUNTER_EVICTED_PORTIONS, stats.Evicted.Portions);
+ SetCounter(COUNTER_EVICTED_BLOBS, stats.Evicted.Blobs);
+ SetCounter(COUNTER_EVICTED_ROWS, stats.Evicted.Rows);
+ SetCounter(COUNTER_EVICTED_BYTES, stats.Evicted.Bytes);
+ SetCounter(COUNTER_EVICTED_RAW_BYTES, stats.Evicted.RawBytes);
}
void TColumnShard::UpdateResourceMetrics(const TUsage& usage) {
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index a690aec811..1a5420361b 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -271,7 +271,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
Self->SetCounter(COUNTER_TABLE_TTLS, ttls.size());
if (!schemaPreset.empty()) {
- Self->SetPrimaryIndex(std::move(schemaPreset), Self->Ttl.TtlColumns());
+ Self->SetPrimaryIndex(std::move(schemaPreset));
}
{ // Load long tx writes
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index c8d8d450ff..352500107f 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -175,19 +175,16 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
}
// If no paths trigger schema defined TTL
- THashMap<ui64, NOlap::TTtlInfo> pathTtls;
+ THashMap<ui64, NOlap::TTiersInfo> pathTtls;
if (!ttlBody.GetPathIds().empty()) {
- ui64 unixTimeSec = ttlBody.GetUnixTimeSeconds();
- auto ts = std::make_shared<arrow::TimestampScalar>(unixTimeSec * 1000 * 1000,
- arrow::timestamp(arrow::TimeUnit::MICRO));
- TString columnName = ttlBody.GetTtlColumnName();
-
- if (!unixTimeSec || !ts->value) {
+ auto unixTime = TInstant::Seconds(ttlBody.GetUnixTimeSeconds());
+ if (!unixTime) {
statusMessage = "TTL tx wrong timestamp";
status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
break;
}
+ TString columnName = ttlBody.GetTtlColumnName();
if (columnName.empty()) {
statusMessage = "TTL tx wrong TTL column";
status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
@@ -195,12 +192,16 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
}
for (ui64 pathId : ttlBody.GetPathIds()) {
- pathTtls.emplace(pathId, NOlap::TTtlInfo{columnName, ts});
+ pathTtls.emplace(pathId, NOlap::TTiersInfo(columnName, unixTime));
}
}
if (auto event = Self->SetupTtl(pathTtls, true)) {
- ctx.Send(Self->SelfId(), event.release());
+ if (event->NeedWrites()) {
+ ctx.Send(Self->EvictionActor, event.release());
+ } else {
+ ctx.Send(Self->SelfId(), event->TxEvent.release());
+ }
status = NKikimrTxColumnShard::EResultStatus::SUCCESS;
}
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h
index 5400d8660d..770d683895 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h
@@ -75,8 +75,8 @@ private:
TMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>> IndexStats;
- static constexpr const ui64 NUM_KINDS = 4;
- static_assert(NUM_KINDS == NOlap::TPortionMeta::INACTIVE, "NUM_KINDS must match NOlap::TPortionMeta::EProduced enum");
+ static constexpr const ui64 NUM_KINDS = 5;
+ static_assert(NUM_KINDS == NOlap::TPortionMeta::EVICTED, "NUM_KINDS must match NOlap::TPortionMeta::EProduced enum");
std::shared_ptr<arrow::RecordBatch> FillStatsBatch() {
@@ -132,40 +132,51 @@ private:
using TUInt64 = arrow::UInt64Type::c_type;
using TUInt32 = arrow::UInt32Type::c_type;
- TUInt64 pathIds[NUM_KINDS] = {pathId, pathId, pathId, pathId};
- /// It's better to keep it in sync with TPortionMeta::EProduced
- TUInt32 kinds[NUM_KINDS] = {1, 2, 3, 4};
+ TUInt64 pathIds[NUM_KINDS] = {pathId, pathId, pathId, pathId, pathId};
+ /// It's in sync with TPortionMeta::EProduced
+ TUInt32 kinds[NUM_KINDS] = {
+ (ui32)NOlap::TPortionMeta::INSERTED,
+ (ui32)NOlap::TPortionMeta::COMPACTED,
+ (ui32)NOlap::TPortionMeta::SPLIT_COMPACTED,
+ (ui32)NOlap::TPortionMeta::INACTIVE,
+ (ui32)NOlap::TPortionMeta::EVICTED
+ };
ui64 tabletId = ReadMetadata->TabletId;
- TUInt64 tabletIds[NUM_KINDS] = {tabletId, tabletId, tabletId, tabletId};
+ TUInt64 tabletIds[NUM_KINDS] = {tabletId, tabletId, tabletId, tabletId, tabletId};
TUInt64 rows[NUM_KINDS] = {
stats.Inserted.Rows,
stats.Compacted.Rows,
stats.SplitCompacted.Rows,
- stats.Inactive.Rows
+ stats.Inactive.Rows,
+ stats.Evicted.Rows
};
TUInt64 bytes[NUM_KINDS] = {
stats.Inserted.Bytes,
stats.Compacted.Bytes,
stats.SplitCompacted.Bytes,
- stats.Inactive.Bytes
+ stats.Inactive.Bytes,
+ stats.Evicted.Bytes
};
TUInt64 rawBytes[NUM_KINDS] = {
stats.Inserted.RawBytes,
stats.Compacted.RawBytes,
stats.SplitCompacted.RawBytes,
- stats.Inactive.RawBytes
+ stats.Inactive.RawBytes,
+ stats.Evicted.RawBytes
};
TUInt64 portions[NUM_KINDS] = {
stats.Inserted.Portions,
stats.Compacted.Portions,
stats.SplitCompacted.Portions,
- stats.Inactive.Portions
+ stats.Inactive.Portions,
+ stats.Evicted.Portions
};
TUInt64 blobs[NUM_KINDS] = {
stats.Inserted.Blobs,
stats.Compacted.Blobs,
stats.SplitCompacted.Blobs,
- stats.Inactive.Blobs
+ stats.Inactive.Blobs,
+ stats.Evicted.Blobs
};
if (Reverse) {
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 08f3982efc..7b6b67a26b 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -36,12 +36,12 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
<< ") apply changes: " << *changes << " at tablet " << Self->TabletID());
TBlobManagerDb blobManagerDb(txc.DB);
- for (const auto& cmtd : Ev->Get()->IndexChanges->DataToIndex) {
+ for (const auto& cmtd : changes->DataToIndex) {
Self->InsertTable->EraseCommitted(dbWrap, cmtd);
Self->BlobManager->DeleteBlob(cmtd.BlobId, blobManagerDb);
}
- const auto& switchedPortions = Ev->Get()->IndexChanges->SwitchedPortions;
+ const auto& switchedPortions = changes->SwitchedPortions;
Self->IncCounter(COUNTER_PORTIONS_DEACTIVATED, switchedPortions.size());
THashSet<TUnifiedBlobId> blobsDeactivated;
@@ -57,7 +57,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
Self->IncCounter(COUNTER_BYTES_DEACTIVATED, blobId.BlobSize());
}
- for (auto& portionInfo : Ev->Get()->IndexChanges->AppendedPortions) {
+ for (auto& portionInfo : changes->AppendedPortions) {
switch (portionInfo.Meta.Produced) {
case NOlap::TPortionMeta::UNSPECIFIED:
Y_VERIFY(false); // unexpected
@@ -70,6 +70,9 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
case NOlap::TPortionMeta::SPLIT_COMPACTED:
Self->IncCounter(COUNTER_SPLIT_COMPACTION_PORTIONS_WRITTEN);
break;
+ case NOlap::TPortionMeta::EVICTED:
+ Y_FAIL("Unexpected evicted case");
+ break;
case NOlap::TPortionMeta::INACTIVE:
Y_FAIL("Unexpected inactive case");
break;
@@ -78,7 +81,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
// Put newly created blobs into cache
if (Ev->Get()->CacheData) {
for (const auto& columnRec : portionInfo.Records) {
- const auto* blob = Ev->Get()->IndexChanges->Blobs.FindPtr(columnRec.BlobRange);
+ const auto* blob = changes->Blobs.FindPtr(columnRec.BlobRange);
Y_VERIFY_DEBUG(blob, "Column data must be passed if CacheData is set");
if (blob) {
Y_VERIFY(columnRec.BlobRange.Size == blob->Size());
@@ -88,7 +91,9 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
}
}
- const auto& portionsToDrop = Ev->Get()->IndexChanges->PortionsToDrop;
+ Self->IncCounter(COUNTER_EVICTION_PORTIONS_WRITTEN, changes->PortionsToEvict.size());
+
+ const auto& portionsToDrop = changes->PortionsToDrop;
THashSet<TUnifiedBlobId> blobsToDrop;
Self->IncCounter(COUNTER_PORTIONS_ERASED, portionsToDrop.size());
for (const auto& portionInfo : portionsToDrop) {
@@ -98,6 +103,12 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
Self->IncCounter(COUNTER_RAW_BYTES_ERASED, portionInfo.RawBytesSum());
}
+ // Note: RAW_BYTES_ERASED and BYTES_ERASED counters are not in sync for evicted data
+ const auto& evictedRecords = changes->EvictedRecords;
+ for (const auto& rec : evictedRecords) {
+ blobsToDrop.insert(rec.BlobRange.BlobId);
+ }
+
Self->IncCounter(COUNTER_BLOBS_ERASED, blobsToDrop.size());
for (const auto& blobId : blobsToDrop) {
Self->BlobManager->DeleteBlob(blobId, blobManagerDb);
@@ -152,6 +163,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
Self->ActiveTtl = false;
Self->IncCounter(ok ? COUNTER_TTL_SUCCESS : COUNTER_TTL_FAIL);
+ Self->IncCounter(COUNTER_EVICTION_BLOBS_WRITTEN, blobsWritten);
+ Self->IncCounter(COUNTER_EVICTION_BYTES_WRITTEN, bytesWritten);
}
Self->UpdateResourceMetrics(Ev->Get()->ResourceUsage);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 10dcea4d86..b1b866aa76 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -412,7 +412,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
schemaPreset.emplace(NOlap::TSnapshot{version.Step, version.TxId},
ConvertSchema(schemaPresetVerProto.GetSchema()));
- SetPrimaryIndex(std::move(schemaPreset), Ttl.TtlColumns());
+ SetPrimaryIndex(std::move(schemaPreset));
}
tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());
@@ -544,14 +544,13 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
#endif
if (!schemaPreset.empty()) {
- SetPrimaryIndex(std::move(schemaPreset), Ttl.TtlColumns());
+ SetPrimaryIndex(std::move(schemaPreset));
}
}
-void TColumnShard::SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions,
- const THashSet<TString>& ttlColumns) {
+void TColumnShard::SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions) {
for (auto& [snap, indexInfo] : schemaVersions) {
- for (auto& columnName : ttlColumns) {
+ for (auto& columnName : Ttl.TtlColumns()) {
indexInfo.AddTtlColumn(columnName);
}
@@ -587,7 +586,11 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) {
}
if (auto event = SetupTtl()) {
- ctx.Send(SelfId(), event.release());
+ if (event->NeedWrites()) {
+ ctx.Send(EvictionActor, event.release());
+ } else {
+ ctx.Send(SelfId(), event->TxEvent.release());
+ }
}
LastBackActivation = TInstant::Now();
@@ -698,8 +701,8 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev));
}
-std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTtlInfo>& pathTtls,
- bool force) {
+std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiersInfo>& pathTtls,
+ bool force) {
if (ActiveTtl) {
LOG_S_DEBUG("Ttl already in progress at tablet " << TabletID());
return {};
@@ -709,9 +712,9 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupTtl(const THashMap
return {};
}
- THashMap<ui64, NOlap::TTtlInfo> regularTtls;
+ THashMap<ui64, NOlap::TTiersInfo> regularTtls;
if (pathTtls.empty()) {
- regularTtls = Ttl.MakeIndexTtlMap(force);
+ regularTtls = Ttl.MakeIndexTtlMap(TInstant::Now(), force);
}
if (pathTtls.empty() && regularTtls.empty()) {
@@ -733,10 +736,11 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupTtl(const THashMap
return {};
}
+ bool needWrites = !indexChanges->PortionsToEvict.empty();
+
ActiveTtl = true;
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges, false);
- ev->PutStatus = NKikimrProto::OK; // No blobs to write, start TTxWriteIndex in event handler
- return ev;
+ return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), needWrites);
}
std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
@@ -792,6 +796,28 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
return ev;
}
+static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) {
+ NOlap::TCompression out;
+ if (compression.HasCompressionCodec()) {
+ switch (compression.GetCompressionCodec()) {
+ case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain:
+ out.Codec = arrow::Compression::UNCOMPRESSED;
+ break;
+ case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4:
+ out.Codec = arrow::Compression::LZ4_FRAME;
+ break;
+ case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD:
+ out.Codec = arrow::Compression::ZSTD;
+ break;
+ }
+ }
+
+ if (compression.HasCompressionLevel()) {
+ out.Level = compression.GetCompressionLevel();
+ }
+ return out;
+}
+
NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) {
Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
@@ -812,29 +838,17 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl
}
if (schema.HasDefaultCompression()) {
- auto& compression = schema.GetDefaultCompression();
- if (compression.HasCompressionCodec()) {
- arrow::Compression::type codec = arrow::Compression::LZ4_FRAME;
- switch (compression.GetCompressionCodec()) {
- case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain:
- codec = arrow::Compression::UNCOMPRESSED;
- break;
- case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4:
- codec = arrow::Compression::LZ4_FRAME; // TODO: should ColumnCodecLZ4 be mapped to LZ4 (row variant)?
- break;
- case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD:
- codec = arrow::Compression::ZSTD;
- break;
- }
- indexInfo.SetDefaultCompressionCodec(codec);
- }
+ NOlap::TCompression compression = ConvertCompression(schema.GetDefaultCompression());
+ indexInfo.SetDefaultCompression(compression);
+ }
- if (compression.HasCompressionLevel()) {
- int level = compression.GetCompressionLevel();
- indexInfo.SetDefaultCompressionLevel(level);
- } else {
- indexInfo.SetDefaultCompressionLevel(); // set default
+ for (auto& tierConfig : schema.GetStorageTiers()) {
+ NOlap::TStorageTier tier;
+ tier.Name = tierConfig.GetName();
+ if (tierConfig.HasCompression()) {
+ tier.Compression = ConvertCompression(tierConfig.GetCompression());
}
+ indexInfo.AddStorageTier(std::move(tier));
}
return indexInfo;
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 4c3c73fd63..b067efa3d7 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -45,6 +45,7 @@ class TColumnShard
{
friend class TIndexingActor;
friend class TCompactionActor;
+ friend class TEvictionActor;
friend class TTxInit;
friend class TTxInitSchema;
friend class TTxUpdateSchema;
@@ -307,6 +308,7 @@ private:
TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices.
TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compation.
+ TActorId EvictionActor;
std::unique_ptr<TTabletCountersBase> TabletCountersPtr;
TTabletCountersBase* TabletCounters;
std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache;
@@ -374,14 +376,14 @@ private:
void RunAlterTable(const NKikimrTxColumnShard::TAlterTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
- void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions, const THashSet<TString>& ttlColumns);
+ void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions);
NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema);
std::unique_ptr<TEvPrivate::TEvIndexing> SetupIndexation();
std::unique_ptr<TEvPrivate::TEvCompaction> SetupCompaction();
- std::unique_ptr<TEvPrivate::TEvWriteIndex> SetupTtl(const THashMap<ui64, NOlap::TTtlInfo>& pathTtls = {},
- bool force = false);
+ std::unique_ptr<TEvPrivate::TEvEviction> SetupTtl(const THashMap<ui64, NOlap::TTiersInfo>& pathTtls = {},
+ bool force = false);
std::unique_ptr<TEvPrivate::TEvWriteIndex> SetupCleanup();
void UpdateBlobMangerCounters();
diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h
index 3dc52a0d8a..91802398ca 100644
--- a/ydb/core/tx/columnshard/columnshard_ttl.h
+++ b/ydb/core/tx/columnshard/columnshard_ttl.h
@@ -9,37 +9,52 @@ public:
struct TEviction {
TString TierName;
- TString ColumnName;
- TDuration ExpireAfter;
+ TDuration EvictAfter;
};
struct TDescription {
+ TString ColumnName;
std::vector<TEviction> Evictions;
TDescription() = default;
TDescription(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl) {
+ TDuration prevEvicSec;
+
if (ttl.HasEnabled()) {
auto& enabled = ttl.GetEnabled();
- Evictions.emplace_back(
- TEviction{{}, enabled.GetColumnName(), TDuration::Seconds(enabled.GetExpireAfterSeconds())});
+ ColumnName = enabled.GetColumnName();
+ auto expireSec = TDuration::Seconds(enabled.GetExpireAfterSeconds());
+
+ Evictions.reserve(1);
+ Evictions.emplace_back(TEviction{{}, expireSec});
} else if (ttl.HasTiering()) {
+ Evictions.reserve(ttl.GetTiering().TiersSize());
+
for (auto& tier : ttl.GetTiering().GetTiers()) {
auto& eviction = tier.GetEviction();
- Evictions.emplace_back(TEviction{tier.GetName(),
- eviction.GetColumnName(), TDuration::Seconds(eviction.GetExpireAfterSeconds())});
+ Y_VERIFY(ColumnName.empty() || ColumnName == eviction.GetColumnName());
+ ColumnName = eviction.GetColumnName();
+ auto evictSec = TDuration::Seconds(eviction.GetExpireAfterSeconds());
+
+ // Ignore next tier if it has smaller eviction time. Prefer first tier with same eviction time.
+ if (evictSec > prevEvicSec) {
+ Evictions.emplace_back(TEviction{tier.GetName(), evictSec});
+ prevEvicSec = evictSec;
+ }
}
+
+ Evictions.shrink_to_fit();
+ }
+
+ if (Enabled()) {
+ Y_VERIFY(!ColumnName.empty());
}
}
bool Enabled() const {
return !Evictions.empty();
}
-
- const TEviction& LastTier() const {
- Y_VERIFY(!Evictions.empty());
- return Evictions.back();
- }
};
ui64 PathsCount() const {
@@ -48,10 +63,11 @@ public:
void SetPathTtl(ui64 pathId, TDescription&& descr) {
if (descr.Enabled()) {
- for (auto& evict : descr.Evictions) {
- if (!evict.ColumnName.empty()) {
- Columns.insert(evict.ColumnName);
- }
+ auto it = Columns.find(descr.ColumnName);
+ if (it != Columns.end()) {
+ descr.ColumnName = *it; // replace string dups (memory efficiency)
+ } else {
+ Columns.insert(descr.ColumnName);
}
PathTtls[pathId] = descr;
} else {
@@ -63,17 +79,17 @@ public:
PathTtls.erase(pathId);
}
- THashMap<ui64, NOlap::TTtlInfo> MakeIndexTtlMap(bool force = false) {
- if ((TInstant::Now() < LastRegularTtl + RegularTtlTimeout) && !force) {
+ THashMap<ui64, NOlap::TTiersInfo> MakeIndexTtlMap(TInstant now, bool force = false) {
+ if ((now < LastRegularTtl + RegularTtlTimeout) && !force) {
return {};
}
- THashMap<ui64, NOlap::TTtlInfo> out;
+ THashMap<ui64, NOlap::TTiersInfo> out;
for (auto& [pathId, descr] : PathTtls) {
- out.emplace(pathId, Convert(descr));
+ out.emplace(pathId, Convert(descr, now));
}
- LastRegularTtl = TInstant::Now();
+ LastRegularTtl = now;
return out;
}
@@ -85,13 +101,16 @@ private:
TDuration RegularTtlTimeout{TDuration::Seconds(DEFAULT_TTL_TIMEOUT_SEC)};
TInstant LastRegularTtl;
- NOlap::TTtlInfo Convert(const TDescription& descr) const {
- auto& lastTier = descr.LastTier();
- ui64 border = (TInstant::Now() - lastTier.ExpireAfter).MicroSeconds();
- return NOlap::TTtlInfo{
- .Column = lastTier.ColumnName,
- .Border = std::make_shared<arrow::TimestampScalar>(border, arrow::timestamp(arrow::TimeUnit::MICRO))
- };
+ NOlap::TTiersInfo Convert(const TDescription& descr, TInstant timePoint) const {
+ Y_VERIFY(descr.Enabled());
+ NOlap::TTiersInfo out(descr.ColumnName);
+
+ for (auto& tier : descr.Evictions) {
+ auto border = timePoint - tier.EvictAfter;
+ out.AddTier(tier.TierName, border);
+ }
+
+ return out;
}
};
diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h
index b7267dd8ff..a5662a6876 100644
--- a/ydb/core/tx/columnshard/columnshard_txs.h
+++ b/ydb/core/tx/columnshard/columnshard_txs.h
@@ -23,6 +23,7 @@ struct TEvPrivate {
EvScanStats,
EvReadFinished,
EvPeriodicWakeup,
+ EvEviction,
EvEnd
};
@@ -68,6 +69,22 @@ struct TEvPrivate {
}
};
+ struct TEvEviction : public TEventLocal<TEvEviction, EvEviction> {
+ std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent;
+
+ explicit TEvEviction(std::unique_ptr<TEvPrivate::TEvWriteIndex> txEvent, bool needWrites)
+ : TxEvent(std::move(txEvent))
+ {
+ if (!needWrites) {
+ TxEvent->PutStatus = NKikimrProto::OK;
+ }
+ }
+
+ bool NeedWrites() const {
+ return (TxEvent->PutStatus != NKikimrProto::OK);
+ }
+ };
+
struct TEvScanStats : public TEventLocal<TEvScanStats, EvScanStats> {
TEvScanStats(ui64 rows, ui64 bytes) : Rows(rows), Bytes(bytes) {}
ui64 Rows;
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 139e83198a..4920b5c694 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -53,8 +53,11 @@ struct TTestSchema {
return !Codec.empty();
}
- TStorageTier& SetCodec(const TString& codec) {
+ TStorageTier& SetCodec(const TString& codec, std::optional<int> level = {}) {
Codec = codec;
+ if (level) {
+ CompressionLevel = *level;
+ }
return *this;
}
@@ -186,6 +189,7 @@ struct TTestSchema {
auto* tiering = ttlSettings->MutableTiering();
for (auto& tier : specials.Tiers) {
auto* t = tiering->AddTiers();
+ t->SetName(tier.Name);
t->MutableEviction()->SetColumnName(tier.TtlColumn);
t->MutableEviction()->SetExpireAfterSeconds(*tier.EvictAfterSeconds);
}
@@ -215,6 +219,7 @@ struct TTestSchema {
auto* tiering = ttlSettings->MutableTiering();
for (auto& tier : specials.Tiers) {
auto* t = tiering->AddTiers();
+ t->SetName(tier.Name);
t->MutableEviction()->SetColumnName(tier.TtlColumn);
t->MutableEviction()->SetExpireAfterSeconds(*tier.EvictAfterSeconds);
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index bad39f81ca..8fb6ac424a 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -57,9 +57,44 @@ struct TCompactionInfo {
}
};
-struct TTtlInfo {
+struct TTiersInfo {
+ struct TTierTimeBorder {
+ TString TierName;
+ TInstant EvictBorder;
+
+ TTierTimeBorder(TString tierName, TInstant evictBorder)
+ : TierName(tierName)
+ , EvictBorder(evictBorder)
+ {}
+
+ std::shared_ptr<arrow::Scalar> ToTimestamp() const {
+ if (Scalar) {
+ return Scalar;
+ }
+
+ Scalar = std::make_shared<arrow::TimestampScalar>(
+ EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO));
+ return Scalar;
+ }
+
+ private:
+ mutable std::shared_ptr<arrow::Scalar> Scalar;
+ };
+
TString Column;
- std::shared_ptr<arrow::Scalar> Border;
+ std::vector<TTierTimeBorder> TierBorders; // Ordered tiers from hottest to coldest
+
+ TTiersInfo(const TString& column, TInstant border = {}, const TString& tierName = {})
+ : Column(column)
+ {
+ if (border) {
+ AddTier(tierName, border);
+ }
+ }
+
+ void AddTier(const TString& tierName, TInstant border) {
+ TierBorders.emplace_back(TTierTimeBorder(tierName, border));
+ }
};
class TColumnEngineChanges {
@@ -92,6 +127,8 @@ public:
TVector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones
TVector<TPortionInfo> AppendedPortions; // New portions after indexing or compaction
TVector<TPortionInfo> PortionsToDrop;
+ TVector<std::pair<TPortionInfo, TString>> PortionsToEvict; // {portion, target tier name}
+ TVector<TColumnRecord> EvictedRecords;
TVector<std::pair<TPortionInfo, ui64>> PortionsToMove; // {portion, new granule}
THashMap<TBlobRange, TString> Blobs;
@@ -146,6 +183,13 @@ public:
}
out << "; ";
}
+ if (ui32 evicted = changes.PortionsToEvict.size()) {
+ out << "evict " << evicted << " portions";
+ for (auto& [portionInfo, tier] : changes.PortionsToEvict) {
+ out << portionInfo << " (to " << tier << ")";
+ }
+ out << "; ";
+ }
if (ui32 dropped = changes.PortionsToDrop.size()) {
out << "drop " << dropped << " portions";
for (auto& portionInfo : changes.PortionsToDrop) {
@@ -260,6 +304,7 @@ struct TColumnEngineStats {
TPortionsStats Compacted{};
TPortionsStats SplitCompacted{};
TPortionsStats Inactive{};
+ TPortionsStats Evicted{};
void Clear() {
*this = {};
@@ -288,7 +333,7 @@ public:
const TSnapshot& outdatedSnapshot) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop) = 0;
- virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTtlInfo>& pathTtls) = 0;
+ virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) = 0;
virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0;
virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
//virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 6fc16a5e3f..e1669b286d 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -15,12 +15,14 @@ std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TIndexInfo& inde
namespace {
-arrow::ipc::IpcWriteOptions WriteOptions(arrow::Compression::type codec, const std::optional<int>& level) {
+arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) {
+ auto& codec = compression.Codec;
+
arrow::ipc::IpcWriteOptions options(arrow::ipc::IpcWriteOptions::Defaults());
Y_VERIFY(arrow::util::Codec::IsAvailable(codec));
arrow::Result<std::unique_ptr<arrow::util::Codec>> resCodec;
- if (level) {
- resCodec = arrow::util::Codec::Create(codec, *level);
+ if (compression.Level) {
+ resCodec = arrow::util::Codec::Create(codec, *compression.Level);
if (!resCodec.ok()) {
resCodec = arrow::util::Codec::Create(codec);
}
@@ -49,7 +51,9 @@ ui64 ExtractTimestamp(const std::shared_ptr<TPredicate>& pkPredicate, const std:
// Although source batches are ordered only by PK (sorting key) resulting pathBatches are ordered by extended key.
// They have const snapshot columns that do not break sorting inside batch.
-std::shared_ptr<arrow::RecordBatch> AddSpecials(const TIndexInfo& indexInfo, const TInsertedData& inserted, const TString& data) {
+std::shared_ptr<arrow::RecordBatch> AddSpecials(const TIndexInfo& indexInfo, const TInsertedData& inserted,
+ const TString& data)
+{
auto schema = indexInfo.ArrowSchema();
Y_VERIFY(!data.empty(), "Blob data not present");
auto batch = NArrow::DeserializeBatch(data, schema);
@@ -70,6 +74,50 @@ std::shared_ptr<arrow::RecordBatch> AddSpecials(const TIndexInfo& indexInfo, con
return arrow::RecordBatch::Make(extendedSchema, numRows, columns);
}
+bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo, const TString& tierName,
+ const THashMap<TBlobRange, TString>& srcBlobs,
+ TVector<TColumnRecord>& evictedRecords, TVector<TString>& newBlobs)
+{
+ Y_VERIFY(portionInfo.TierName != tierName);
+
+ auto compression = indexInfo.GetTierCompression(tierName);
+ if (!compression) {
+ // Noting to recompress. We have no other kinds of evictions yet. Return.
+ portionInfo.TierName = tierName;
+ return true;
+ }
+
+ auto schema = indexInfo.ArrowSchemaWithSpecials();
+ auto batch = portionInfo.AssembleInBatch(indexInfo, schema, srcBlobs);
+ auto writeOptions = WriteOptions(*compression);
+
+ TPortionInfo undo = portionInfo;
+ size_t undoSize = newBlobs.size();
+
+ std::vector<TString> blobs;
+ for (auto& rec : portionInfo.Records) {
+ auto colName = indexInfo.GetColumnName(rec.ColumnId);
+ std::string name(colName.data(), colName.size());
+ auto field = schema->GetFieldByName(name);
+
+ auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(name), field, writeOptions);
+ if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
+ portionInfo = undo;
+ newBlobs.resize(undoSize);
+ return false;
+ }
+ newBlobs.emplace_back(std::move(blob));
+ rec.BlobRange = TBlobRange{};
+ }
+
+ for (auto& rec : undo.Records) {
+ evictedRecords.emplace_back(std::move(rec));
+ }
+
+ portionInfo.AddMetadata(indexInfo, batch, tierName);
+ return true;
+}
+
TVector<TPortionInfo> MakeAppendedPortions(const TIndexInfo& indexInfo,
std::shared_ptr<arrow::RecordBatch> batch,
ui64 granule,
@@ -79,7 +127,12 @@ TVector<TPortionInfo> MakeAppendedPortions(const TIndexInfo& indexInfo,
auto schema = indexInfo.ArrowSchemaWithSpecials();
TVector<TPortionInfo> out;
- auto writeOptions = WriteOptions(indexInfo.CompressionCodec(), indexInfo.CompressionLevel());
+ TString tierName = indexInfo.GetTierName(0);
+ auto compression = indexInfo.GetTierCompression(0);
+ if (!compression) {
+ compression = indexInfo.GetDefaultCompression();
+ }
+ auto writeOptions = WriteOptions(*compression);
std::shared_ptr<arrow::RecordBatch> portionBatch = batch;
for (i32 pos = 0; pos < batch->num_rows();) {
@@ -111,7 +164,7 @@ TVector<TPortionInfo> MakeAppendedPortions(const TIndexInfo& indexInfo,
}
if (ok) {
- portionInfo.AddMetadata(indexInfo, portionBatch);
+ portionInfo.AddMetadata(indexInfo, portionBatch, tierName);
out.emplace_back(std::move(portionInfo));
for (auto& blob : portionBlobs) {
blobs.push_back(blob);
@@ -341,7 +394,8 @@ ui64 TColumnEngineForLogs::MemoryUsage() const {
ui64 numPortions = Counters.Inserted.Portions +
Counters.Compacted.Portions +
Counters.SplitCompacted.Portions +
- Counters.Inactive.Portions;
+ Counters.Inactive.Portions +
+ Counters.Evicted.Portions;
return Counters.Granules * (sizeof(TGranuleMeta) + sizeof(ui64)) +
numPortions * (sizeof(TPortionInfo) + sizeof(ui64)) +
@@ -414,6 +468,9 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
case NOlap::TPortionMeta::INACTIVE:
srcStats = &engineStats.Inactive;
break;
+ case NOlap::TPortionMeta::EVICTED:
+ srcStats = &engineStats.Evicted;
+ break;
}
Y_VERIFY(srcStats);
auto* stats = portionInfo.IsActive() ? srcStats : &engineStats.Inactive;
@@ -699,7 +756,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
return changes;
}
-std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTtlInfo>& pathTtls) {
+std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) {
if (pathTtls.empty()) {
return {};
}
@@ -716,6 +773,8 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
continue;
}
+ Y_VERIFY(!ttl.TierBorders.empty());
+
ui32 ttlColumnId = IndexInfo.GetColumnId(ttl.Column);
auto& tsGranule = PathGranules[pathId];
for (auto [ts, granule] : tsGranule) {
@@ -723,8 +782,23 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
Y_VERIFY(spg);
for (auto& [portion, info] : spg->Portions) {
+ if (!info.IsActive()) {
+ continue;
+ }
+
if (auto max = info.MaxValue(ttlColumnId)) {
- if (NArrow::ScalarLess(*max, *ttl.Border)) {
+ bool keep = false;
+ for (auto& border : ttl.TierBorders) {
+ if (NArrow::ScalarLess(*border.ToTimestamp(), *max)) {
+ keep = true;
+ if (info.TierName != border.TierName) {
+ changes->PortionsToEvict.emplace_back(info, border.TierName);
+ }
+ break;
+ }
+ }
+ if (!keep) {
+ Y_VERIFY(!NArrow::ScalarLess(*ttl.TierBorders.back().ToTimestamp(), *max));
changes->PortionsToDrop.push_back(info);
}
}
@@ -732,7 +806,8 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
}
}
- if (changes->PortionsToDrop.empty()) {
+ if (changes->PortionsToDrop.empty() &&
+ changes->PortionsToEvict.empty()) {
return {};
}
@@ -815,17 +890,19 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
for (auto& portionInfo : changes->AppendedPortions) {
portionInfo.UpdateRecords(++portion, granuleRemap, snapshot);
- portionInfo.Meta.Produced = TPortionMeta::INSERTED;
+ TPortionMeta::EProduced produced = TPortionMeta::INSERTED;
// If it's a split compaction with moves appended portions are INSERTED (could have overlaps with others)
if (changes->IsCompaction() && changes->PortionsToMove.empty()) {
Y_VERIFY(changes->CompactionInfo);
- portionInfo.Meta.Produced = changes->CompactionInfo->InGranule ?
+ produced = changes->CompactionInfo->InGranule ?
TPortionMeta::COMPACTED : TPortionMeta::SPLIT_COMPACTED;
}
- for (auto& record : portionInfo.Records) {
- record.Metadata = portionInfo.GetMetadata(record);
- }
+ portionInfo.UpdateRecordsMeta(produced);
+ }
+
+ for (auto& [portionInfo, _] : changes->PortionsToEvict) {
+ portionInfo.UpdateRecordsMeta(TPortionMeta::EVICTED);
}
for (auto& [_, id] : changes->PortionsToMove) {
@@ -940,7 +1017,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
ui64 granule = portionInfo.Granule();
if (!Granules.count(granule)) {
- LOG_S_ERROR("Cannot update portions with unknown granule " << granule << " at tablet " << TabletId);
+ LOG_S_ERROR("Cannot update portion " << portionInfo << " with unknown granule at tablet " << TabletId);
return false;
}
@@ -970,6 +1047,43 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
}
}
+ // Update evicted portions
+ // There could be race between compaction and eviction. Allow compaction and disallow eviction in this case.
+
+ for (auto& [info, _] : changes.PortionsToEvict) {
+ const auto& portionInfo = info;
+ Y_VERIFY(!portionInfo.Empty());
+ Y_VERIFY(portionInfo.IsActive());
+
+ ui64 granule = portionInfo.Granule();
+ ui64 portion = portionInfo.Portion();
+ if (!Granules.count(granule) || !Granules[granule]->Portions.count(portion)) {
+ LOG_S_ERROR("Cannot evict unknown portion " << portionInfo << " at tablet " << TabletId);
+ return false;
+ }
+
+ // In case of race with compaction portion could become inactive
+ // TODO: evict others instead of abort eviction
+ auto& oldInfo = Granules[granule]->Portions[portion];
+ if (!oldInfo.IsActive()) {
+ LOG_S_WARN("Cannot evict inactive portion " << oldInfo << " at tablet " << TabletId);
+ return false;
+ }
+ Y_VERIFY(portionInfo.TierName != oldInfo.TierName);
+
+ // TODO: update stats
+ if (!UpsertPortion(portionInfo, apply, false)) {
+ LOG_S_ERROR("Cannot evict portion " << portionInfo << " at tablet " << TabletId);
+ return false;
+ }
+
+ if (apply) {
+ for (auto& record : portionInfo.Records) {
+ ColumnsTable->Write(db, record);
+ }
+ }
+ }
+
// Move portions in granules (zero-copy switch + append into new granules)
for (auto& [info, granule] : changes.PortionsToMove) {
@@ -1795,4 +1909,29 @@ TVector<TString> TColumnEngineForLogs::CompactBlobs(const TIndexInfo& indexInfo,
return CompactSplitGranule(indexInfo, castedChanges);
}
+TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo,
+ std::shared_ptr<TColumnEngineChanges> changes) {
+ Y_VERIFY(changes);
+ Y_VERIFY(!changes->Blobs.empty()); // src data
+ Y_VERIFY(!changes->PortionsToEvict.empty()); // src meta
+ Y_VERIFY(changes->EvictedRecords.empty()); // dst meta
+
+ TVector<TString> newBlobs;
+ TVector<std::pair<TPortionInfo, TString>> evicted;
+ evicted.reserve(changes->PortionsToEvict.size());
+
+ for (auto& [portionInfo, tierName] : changes->PortionsToEvict) {
+ Y_VERIFY(!portionInfo.Empty());
+ Y_VERIFY(portionInfo.IsActive());
+
+ if (UpdateEvictedPortion(portionInfo, indexInfo, tierName, changes->Blobs, changes->EvictedRecords, newBlobs)) {
+ Y_VERIFY(portionInfo.TierName == tierName);
+ evicted.emplace_back(std::move(portionInfo), TString{});
+ }
+ }
+
+ changes->PortionsToEvict.swap(evicted);
+ return newBlobs;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index dca2e6afc1..a7faeb2bd2 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -141,7 +141,7 @@ public:
const TSnapshot& outdatedSnapshot) override;
std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop) override;
- std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTtlInfo>& pathTtls) override;
+ std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) override;
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) override;
void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override;
@@ -164,6 +164,9 @@ public:
/// @note called from CompactionActor
static TVector<TString> CompactBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes);
+ /// @note called from EvictionActor
+ static TVector<TString> EvictBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes);
+
static ui64 ExtractKey(const TString& key) {
Y_VERIFY(key.size() == 8);
return *reinterpret_cast<const ui64*>(key.data());
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 63cca9a6b9..20f2b3ea0b 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -54,6 +54,16 @@ GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const TVector<ui32>
struct TInsertedData;
+struct TCompression {
+ arrow::Compression::type Codec{arrow::Compression::LZ4_FRAME};
+ std::optional<int> Level;
+};
+
+struct TStorageTier {
+ TString Name;
+ std::optional<TCompression> Compression;
+};
+
/// Column engine index description in terms of tablet's local table.
/// We have to use YDB types for keys here.
struct TIndexInfo : public NTable::TScheme::TTableSchema {
@@ -189,10 +199,45 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema {
static bool IsSpecialColumn(const arrow::Field& field);
static std::vector<std::shared_ptr<arrow::Array>> MakeSpecialColumns(const TInsertedData& blob, ui64 size);
- arrow::Compression::type CompressionCodec() const { return DefaultCompressionCodec; }
- void SetDefaultCompressionCodec(arrow::Compression::type codec) { DefaultCompressionCodec = codec; }
- std::optional<int> CompressionLevel() const { return DefaultCompressionLevel; }
- void SetDefaultCompressionLevel(const std::optional<int>& level = {}) { DefaultCompressionLevel = level; }
+ void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; }
+ const TCompression& GetDefaultCompression() const { return DefaultCompression; }
+
+ std::optional<TCompression> GetTierCompression(ui32 tierNo) const {
+ if (!Tiers.empty()) {
+ Y_VERIFY(tierNo < Tiers.size());
+ return Tiers[tierNo].Compression;
+ }
+ return {};
+ }
+
+ std::optional<TCompression> GetTierCompression(const TString& tierName) const {
+ if (tierName.empty()) {
+ return {};
+ }
+ ui32 tierNo = GetTierNumber(tierName);
+ Y_VERIFY(tierNo != Max<ui32>());
+ return GetTierCompression(tierNo);
+ }
+
+ TString GetTierName(ui32 tierNo) const {
+ if (!Tiers.empty()) {
+ Y_VERIFY(tierNo < Tiers.size());
+ return Tiers[tierNo].Name;
+ }
+ return {};
+ }
+
+ void AddStorageTier(TStorageTier&& tier) {
+ TierByName[tier.Name] = Tiers.size();
+ Tiers.emplace_back(std::move(tier));
+ }
+
+ ui32 GetTierNumber(const TString& tierName) const {
+ if (auto it = TierByName.find(tierName); it != TierByName.end()) {
+ return it->second;
+ }
+ return Max<ui32>();
+ }
private:
ui32 Id;
@@ -205,8 +250,9 @@ private:
std::shared_ptr<arrow::Schema> IndexKey;
THashSet<TString> RequiredColumns;
THashSet<ui32> MinMaxIdxColumnsIds;
- arrow::Compression::type DefaultCompressionCodec{arrow::Compression::LZ4_FRAME};
- std::optional<int> DefaultCompressionLevel;
+ TCompression DefaultCompression;
+ std::vector<TStorageTier> Tiers;
+ THashMap<TString, ui32> TierByName;
void AddRequiredColumns(const TVector<TString>& columns) {
for (auto& name: columns) {
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index e67ab39605..15ae8968be 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -3,25 +3,20 @@
namespace NKikimr::NOlap {
-namespace {
-
-TString SerializeColumn(const std::shared_ptr<arrow::Array>& array,
- const std::shared_ptr<arrow::Field>& field,
- const arrow::ipc::IpcWriteOptions& writeOptions, int64_t& memSize) {
-
+TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array,
+ const std::shared_ptr<arrow::Field>& field,
+ const arrow::ipc::IpcWriteOptions& writeOptions)
+{
std::vector<std::shared_ptr<arrow::Field>> tmp{field};
auto schema = std::make_shared<arrow::Schema>(tmp);
auto batch = arrow::RecordBatch::Make(schema, array->length(), {array});
Y_VERIFY(batch);
-#if 0
- auto status = GetRecordBatchSize(*batch, arrow::ipc::IpcWriteOptions::Defaults(), &memSize);
- Y_VERIFY(status.ok());
-#else
- Y_UNUSED(memSize);
-#endif
+
return NArrow::SerializeBatch(batch, writeOptions);
}
+namespace {
+
std::shared_ptr<arrow::ChunkedArray> DeserializeBlobs(const TVector<TString>& blobs, std::shared_ptr<arrow::Field> field) {
Y_VERIFY(!blobs.empty());
std::vector<std::shared_ptr<arrow::Field>> tmp{field};
@@ -46,8 +41,7 @@ TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& arr
TColumnRecord&& record,
const arrow::ipc::IpcWriteOptions& writeOptions,
ui32 limitBytes) {
- int64_t memSize = 0;
- auto blob = SerializeColumn(array, field, writeOptions, memSize);
+ auto blob = SerializeColumn(array, field, writeOptions);
if (blob.size() >= limitBytes) {
return {};
}
@@ -124,7 +118,11 @@ void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>&
Meta.ColumnMeta[columnId].Max = NArrow::GetScalar(column, minMaxPos.second);
}
-void TPortionInfo::AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch) {
+void TPortionInfo::AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TString& tierName) {
+ TierName = tierName;
+ Meta = {};
+
/// @note It does not add RawBytes info for snapshot columns, only for user ones.
for (auto& [columnId, col] : indexInfo.Columns) {
auto column = batch->GetColumnByName(col.Name);
@@ -165,23 +163,32 @@ TString TPortionInfo::GetMetadata(const TColumnRecord& rec) const {
}
if (rec.ColumnId == FirstPkColumn) {
+ auto* portionMeta = meta.MutablePortionMeta();
+
switch (Meta.Produced) {
case TPortionMeta::UNSPECIFIED:
Y_VERIFY(false);
case TPortionMeta::INSERTED:
- meta.MutablePortionMeta()->SetIsInserted(true);
+ portionMeta->SetIsInserted(true);
break;
case TPortionMeta::COMPACTED:
- meta.MutablePortionMeta()->SetIsCompacted(true);
+ portionMeta->SetIsCompacted(true);
break;
case TPortionMeta::SPLIT_COMPACTED:
- meta.MutablePortionMeta()->SetIsSplitCompacted(true);
+ portionMeta->SetIsSplitCompacted(true);
+ break;
+ case TPortionMeta::EVICTED:
+ portionMeta->SetIsEvicted(true);
break;
case TPortionMeta::INACTIVE:
Y_FAIL("Unexpected inactive case");
- //meta.MutablePortionMeta()->SetInactive(true);
+ //portionMeta->SetInactive(true);
break;
}
+
+ if (!TierName.empty()) {
+ portionMeta->SetTierName(TierName);
+ }
}
TString out;
@@ -203,12 +210,16 @@ void TPortionInfo::LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord
if (meta.HasPortionMeta()) {
auto& portionMeta = meta.GetPortionMeta();
+ TierName = portionMeta.GetTierName();
+
if (portionMeta.GetIsInserted()) {
Meta.Produced = TPortionMeta::INSERTED;
} else if (portionMeta.GetIsCompacted()) {
Meta.Produced = TPortionMeta::COMPACTED;
} else if (portionMeta.GetIsSplitCompacted()) {
Meta.Produced = TPortionMeta::SPLIT_COMPACTED;
+ } else if (portionMeta.GetIsEvicted()) {
+ Meta.Produced = TPortionMeta::EVICTED;
}
}
if (meta.HasNumRows()) {
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 884610037e..fa35d36530 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -12,7 +12,8 @@ struct TPortionMeta {
INSERTED = 1,
COMPACTED = 2,
SPLIT_COMPACTED = 3,
- INACTIVE = 4
+ INACTIVE = 4,
+ EVICTED = 5,
};
struct TColumnMeta {
@@ -42,9 +43,12 @@ struct TPortionMeta {
};
struct TPortionInfo {
+ static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024;
+
TVector<TColumnRecord> Records;
TPortionMeta Meta;
ui32 FirstPkColumn = 0;
+ TString TierName;
bool Empty() const { return Records.empty(); }
bool Valid() const { return !Empty() && Meta.Produced != TPortionMeta::UNSPECIFIED && HasMinMax(FirstPkColumn); }
@@ -112,6 +116,13 @@ struct TPortionInfo {
}
}
+ void UpdateRecordsMeta(TPortionMeta::EProduced produced) {
+ Meta.Produced = produced;
+ for (auto& record : Records) {
+ record.Metadata = GetMetadata(record);
+ }
+ }
+
void SetStale(const TSnapshot& snapshot) {
for (auto& rec : Records) {
rec.SetXSnapshot(snapshot);
@@ -125,7 +136,8 @@ struct TPortionInfo {
TString GetMetadata(const TColumnRecord& rec) const;
void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec);
- void AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch);
+ void AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TString& tierName);
void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted);
std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;
@@ -176,11 +188,15 @@ struct TPortionInfo {
const std::shared_ptr<arrow::Schema>& schema,
const THashMap<TBlobRange, TString>& data) const;
+ static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array,
+ const std::shared_ptr<arrow::Field>& field,
+ const arrow::ipc::IpcWriteOptions& writeOptions);
+
TString AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,
TColumnRecord&& record,
const arrow::ipc::IpcWriteOptions& writeOptions,
- ui32 limitBytes = 8 * 1024 * 1024);
+ ui32 limitBytes = BLOB_BYTES_LIMIT);
friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) {
for (auto& rec : info.Records) {
@@ -188,6 +204,9 @@ struct TPortionInfo {
out << " (1 of " << info.Records.size() << " blobs shown)";
break;
}
+ if (!info.TierName.empty()) {
+ out << " tier: " << info.TierName;
+ }
out << " " << info.Meta;
return out;
}
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 7bdfb5b7bd..42f1544007 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -225,9 +225,8 @@ public:
return batch;
}
- static NOlap::TTtlInfo MakeTtl(ui64 ts) {
- auto scalar = std::make_shared<arrow::TimestampScalar>(ts, arrow::timestamp(arrow::TimeUnit::MICRO));
- return NOlap::TTtlInfo{testColumns[0].first, scalar};
+ static NOlap::TTiersInfo MakeTtl(TInstant border) {
+ return NOlap::TTiersInfo(testColumns[0].first, border);
}
private:
@@ -341,7 +340,7 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u
}
bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db,
- const THashMap<ui64, NOlap::TTtlInfo>& pathTtls, ui32 expectedToDrop) {
+ const THashMap<ui64, NOlap::TTiersInfo>& pathTtls, ui32 expectedToDrop) {
std::shared_ptr<TColumnEngineChanges> changes = engine.StartTtl(pathTtls);
UNIT_ASSERT(changes);
UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop);
@@ -649,8 +648,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
// TTL
- THashMap<ui64, NOlap::TTtlInfo> pathTtls;
- pathTtls.emplace(pathId, TBuilder::MakeTtl(10000));
+ THashMap<ui64, NOlap::TTiersInfo> pathTtls;
+ pathTtls.emplace(pathId, TBuilder::MakeTtl(TInstant::MicroSeconds(10000)));
Ttl(engine, db, pathTtls, 2);
// read + load + read
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
new file mode 100644
index 0000000000..da752e96d2
--- /dev/null
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -0,0 +1,151 @@
+#include "columnshard_impl.h"
+#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
+#include "blob_cache.h"
+
+namespace NKikimr::NColumnShard {
+
+using NOlap::TBlobRange;
+
+class TEvictionActor : public TActorBootstrapped<TEvictionActor> {
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::TX_COLUMNSHARD_EVICTION_ACTOR;
+ }
+
+ TEvictionActor(ui64 tabletId, const TActorId& parent)
+ : TabletId(tabletId)
+ , Parent(parent)
+ , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
+ {}
+
+ void Handle(TEvPrivate::TEvEviction::TPtr& ev, const TActorContext& ctx) {
+ auto& event = *ev->Get();
+ TxEvent = std::move(event.TxEvent);
+ Y_VERIFY(TxEvent);
+ Y_VERIFY(Blobs.empty() && !NumRead);
+
+ auto& indexChanges = TxEvent->IndexChanges;
+ Y_VERIFY(indexChanges);
+
+ LOG_S_DEBUG("Portions eviction: " << *indexChanges << " at tablet " << TabletId);
+
+ auto& evictedPortions = indexChanges->PortionsToEvict;
+ Y_VERIFY(evictedPortions.size());
+
+ for (auto& [portionInfo, tierName] : evictedPortions) {
+ Y_VERIFY(!portionInfo.Empty());
+ std::vector<NBlobCache::TBlobRange> ranges;
+ for (auto& rec : portionInfo.Records) {
+ auto& blobRange = rec.BlobRange;
+ Blobs[blobRange] = {};
+ // Group only ranges from the same blob into one request
+ if (!ranges.empty() && ranges.back().BlobId != blobRange.BlobId) {
+ SendReadRequest(ctx, std::move(ranges));
+ ranges = {};
+ }
+ ranges.push_back(blobRange);
+ }
+ SendReadRequest(ctx, std::move(ranges));
+ }
+ }
+
+ void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) {
+ LOG_S_TRACE("TEvReadBlobRangeResult (got " << NumRead << " of " << Blobs.size()
+ << ") at tablet " << TabletId << " (eviction)");
+
+ auto& event = *ev->Get();
+ const TBlobRange& blobId = event.BlobRange;
+ Y_VERIFY(Blobs.count(blobId));
+ if (!Blobs[blobId].empty()) {
+ return;
+ }
+
+ if (event.Status == NKikimrProto::EReplyStatus::OK) {
+ Y_VERIFY(event.Data.size());
+
+ TString blobData = event.Data;
+ Y_VERIFY(blobData.size() == blobId.Size, "%u vs %u", (ui32)blobData.size(), blobId.Size);
+ Blobs[blobId] = blobData;
+ } else {
+ LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() << " status " << event.Status
+ << " at tablet " << TabletId << " (eviction)");
+ TxEvent->PutStatus = event.Status;
+ if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) {
+ TxEvent->PutStatus = NKikimrProto::ERROR;
+ }
+ }
+
+ ++NumRead;
+ if (NumRead == Blobs.size()) {
+ EvictPortions(ctx);
+ Clear();
+ }
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ Y_UNUSED(ctx);
+ Become(&TThis::StateWait);
+ }
+
+ STFUNC(StateWait) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvPrivate::TEvEviction, Handle);
+ HFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle);
+ default:
+ break;
+ }
+ }
+
+private:
+ ui64 TabletId;
+ TActorId Parent;
+ TActorId BlobCacheActorId;
+ std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent;
+ THashMap<TBlobRange, TString> Blobs;
+ ui32 NumRead{0};
+
+ void Clear() {
+ Blobs.clear();
+ NumRead = 0;
+ }
+
+ void SendReadRequest(const TActorContext&, std::vector<NBlobCache::TBlobRange>&& ranges) {
+ if (ranges.empty()) {
+ return;
+ }
+
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), false));
+ }
+
+ void EvictPortions(const TActorContext& ctx) {
+ Y_VERIFY(TxEvent);
+ if (TxEvent->PutStatus != NKikimrProto::EReplyStatus::UNKNOWN) {
+ LOG_S_INFO("Portions eviction not started at tablet " << TabletId);
+ ctx.Send(Parent, TxEvent.release());
+ return;
+ }
+
+ LOG_S_DEBUG("Portions eviction started at tablet " << TabletId);
+ {
+ TCpuGuard guard(TxEvent->ResourceUsage);
+
+ TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
+
+ TxEvent->Blobs = NOlap::TColumnEngineForLogs::EvictBlobs(TxEvent->IndexInfo, TxEvent->IndexChanges);
+ if (TxEvent->Blobs.empty()) {
+ TxEvent->PutStatus = NKikimrProto::OK;
+ }
+ }
+ ui32 blobsSize = TxEvent->Blobs.size();
+ ctx.Send(Parent, TxEvent.release());
+
+ LOG_S_DEBUG("Portions eviction finished (" << blobsSize << " new blobs) at tablet " << TabletId);
+ //Die(ctx); // It's alive till tablet's death
+ }
+};
+
+IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent) {
+ return new TEvictionActor(tabletId, parent);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
index 00e3143fc6..e2723fafb9 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
@@ -1119,7 +1119,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
auto batchStats = scan->ArrowBatch;
UNIT_ASSERT(batchStats);
// Cerr << batchStats->ToString() << Endl;
- UNIT_ASSERT_VALUES_EQUAL(batchStats->num_rows(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(batchStats->num_rows(), 5);
for (ui32 i = 0; i < batchStats->num_rows(); ++i) {
auto paths = batchStats->GetColumnByName("PathId");
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index d2bb656e92..1511c3219b 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -42,18 +42,24 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s
return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS);
}
-bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize,
- const std::string& columnName, i64 seconds) {
+std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TString& blob, const TString& srtSchema,
+ const std::string& columnName)
+{
auto schema = NArrow::DeserializeSchema(srtSchema);
auto batch = NArrow::DeserializeBatch(blob, schema);
UNIT_ASSERT(batch);
+ std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName);
+ UNIT_ASSERT(array);
+ return std::static_pointer_cast<arrow::TimestampArray>(array);
+}
+
+bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize,
+ const std::string& columnName, i64 seconds) {
auto expected = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO));
UNIT_ASSERT_VALUES_EQUAL(expected.value, seconds * 1000 * 1000);
- std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName);
- UNIT_ASSERT(array);
- auto tsCol = std::static_pointer_cast<arrow::TimestampArray>(array);
+ auto tsCol = GetTimestampColumn(blob, srtSchema, columnName);
UNIT_ASSERT(tsCol);
UNIT_ASSERT_VALUES_EQUAL(tsCol->length(), expectedSize);
@@ -67,7 +73,32 @@ bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize,
return true;
}
-void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}) {
+std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize) {
+ UNIT_ASSERT(ts.size() == 2);
+
+ TString data1 = MakeTestBlob({0, portionSize}, testYdbSchema);
+ UNIT_ASSERT(data1.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
+ UNIT_ASSERT(data1.size() < 7 * 1024 * 1024);
+
+ TString data2 = MakeTestBlob({overlapSize, overlapSize + portionSize}, testYdbSchema);
+ UNIT_ASSERT(data2.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
+ UNIT_ASSERT(data2.size() < 7 * 1024 * 1024);
+
+ auto schema = NArrow::MakeArrowSchema(testYdbSchema);
+ auto batch1 = UpdateColumn(NArrow::DeserializeBatch(data1, schema), TTestSchema::DefaultTtlColumn, ts[0]);
+ auto batch2 = UpdateColumn(NArrow::DeserializeBatch(data2, schema), TTestSchema::DefaultTtlColumn, ts[1]);
+
+ std::vector<TString> data;
+ data.emplace_back(NArrow::SerializeBatchNoCompression(batch1));
+ data.emplace_back(NArrow::SerializeBatchNoCompression(batch2));
+ return data;
+}
+
+// ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020
+// ts[1] = 1620000000; // date -u --date='@1620000000' Mon May 3 00:00:00 UTC 2021
+void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
+ std::vector<ui64> ts = {1600000000, 1620000000})
+{
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -88,12 +119,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {})
ui64 planStep = 1000000000; // greater then delays
ui64 txId = 100;
- ui64 ts1 = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020
- ui64 ts2 = 1620000000; // date -u --date='@1620000000' Mon May 3 00:00:00 UTC 2021
+ UNIT_ASSERT(ts.size() == 2);
ui32 ttlSec = TInstant::Now().Seconds(); // disable internal tll
if (internal) {
- ttlSec -= (ts1 + ts2) / 2; // enable internal ttl between ts1 and ts2
+ ttlSec -= (ts[0] + ts[1]) / 2; // enable internal ttl between ts1 and ts2
}
if (spec.HasTiers()) {
spec.Tiers[0].SetTtl(ttlSec);
@@ -108,31 +138,14 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {})
//
- static const ui32 portionSize = 80 * 1000;
- static const ui32 overlapSize = 40 * 1000;
-
- TString data1 = MakeTestBlob({0, portionSize}, testYdbSchema);
- UNIT_ASSERT(data1.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
- UNIT_ASSERT(data1.size() < 7 * 1024 * 1024);
-
- TString data2 = MakeTestBlob({overlapSize, overlapSize + portionSize}, testYdbSchema);
- UNIT_ASSERT(data2.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
- UNIT_ASSERT(data2.size() < 7 * 1024 * 1024);
-
- auto schema = NArrow::MakeArrowSchema(testYdbSchema);
- auto batch1 = NArrow::DeserializeBatch(data1, schema);
- auto batch2 = NArrow::DeserializeBatch(data2, schema);
-
- data1 = NArrow::SerializeBatchNoCompression(UpdateColumn(batch1, TTestSchema::DefaultTtlColumn, ts1));
- data2 = NArrow::SerializeBatchNoCompression(UpdateColumn(batch2, TTestSchema::DefaultTtlColumn, ts2));
-
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data1));
- ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
- PlanCommit(runtime, sender, ++planStep, txId);
-
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data2));
- ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
- PlanCommit(runtime, sender, ++planStep, txId);
+ ui32 portionSize = 80 * 1000;
+ auto blobs = MakeData(ts, portionSize, portionSize / 2);
+ UNIT_ASSERT_EQUAL(blobs.size(), 2);
+ for (auto& data : blobs) {
+ UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data));
+ ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
+ PlanCommit(runtime, sender, ++planStep, txId);
+ }
// TODO: write into path 2 (no ttl)
@@ -143,7 +156,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {})
if (internal) {
TriggerTTL(runtime, sender, {++planStep, ++txId}, {});
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts1 + 1);
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1);
}
TAutoPtr<IEventHandle> handle;
@@ -170,11 +183,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {})
UNIT_ASSERT(resRead.GetData().size() > 0);
auto& schema = resRead.GetMeta().GetSchema();
- UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts2));
+ UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts[1]));
}
// Alter TTL
- ttlSec = TInstant::Now().Seconds() - (ts2 + 1);
+ ttlSec = TInstant::Now().Seconds() - (ts[1] + 1);
if (spec.HasTiers()) {
spec.Tiers[0].SetTtl(ttlSec);
} else {
@@ -189,7 +202,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {})
if (internal) {
TriggerTTL(runtime, sender, {++planStep, ++txId}, {});
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts2 + 1);
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1);
}
{
@@ -217,14 +230,14 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {})
UNIT_ASSERT(ok);
PlanSchemaTx(runtime, sender, {planStep, txId});
- UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data1));
+ UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, blobs[0]));
ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
PlanCommit(runtime, sender, ++planStep, txId);
if (internal) {
TriggerTTL(runtime, sender, {++planStep, ++txId}, {});
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts1 - 1);
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1);
}
{
@@ -245,8 +258,151 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {})
UNIT_ASSERT(resRead.GetData().size() > 0);
auto& schema = resRead.GetMeta().GetSchema();
- UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts1));
+ UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts[0]));
+ }
+}
+
+std::vector<std::pair<std::shared_ptr<arrow::TimestampArray>, ui64>>
+TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTestSchema::TTableSpecials>& specs) {
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ CreateTestBootstrapper(runtime,
+ CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::COLUMNSHARD),
+ &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ //
+
+ ui64 metaShard = TTestTxConfig::TxTablet1;
+ ui64 writeId = 0;
+ ui64 tableId = 1;
+ ui64 planStep = 1000000000; // greater then delays
+ ui64 txId = 100;
+
+ UNIT_ASSERT(specs.size() > 0);
+ bool ok = ProposeSchemaTx(runtime, sender,
+ TTestSchema::CreateTableTxBody(tableId, testYdbSchema, specs[0]),
+ {++planStep, ++txId});
+ UNIT_ASSERT(ok);
+ PlanSchemaTx(runtime, sender, {planStep, txId});
+
+ for (auto& data : blobs) {
+ UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data));
+ ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
+ PlanCommit(runtime, sender, ++planStep, txId);
+ }
+
+ if (reboots) {
+ RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
+ }
+
+ TAutoPtr<IEventHandle> handle;
+
+ std::vector<std::pair<std::shared_ptr<arrow::TimestampArray>, ui64>> resColumns;
+ resColumns.reserve(specs.size());
+
+ for (ui32 i = 0; i < specs.size(); ++i) {
+ if (i) {
+ ui32 version = i + 1;
+ ok = ProposeSchemaTx(runtime, sender,
+ TTestSchema::AlterTableTxBody(tableId, version, specs[i]),
+ {++planStep, ++txId});
+ UNIT_ASSERT(ok);
+ PlanSchemaTx(runtime, sender, {planStep, txId});
+ }
+
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {});
+
+ // Read
+
+ --planStep;
+ auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
+ Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn);
+
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
+ UNIT_ASSERT(event);
+
+ auto& resRead = Proto(event);
+ UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
+ UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
+ UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
+ UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
+
+ if (resRead.GetData().size()) {
+ auto& meta = resRead.GetMeta();
+ auto& schema = meta.GetSchema();
+ auto tsColumn = GetTimestampColumn(resRead.GetData(), schema, TTestSchema::DefaultTtlColumn);
+ UNIT_ASSERT(tsColumn);
+
+ UNIT_ASSERT(meta.HasReadStats());
+ auto& readStats = meta.GetReadStats();
+ ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage
+
+ resColumns.emplace_back(tsColumn, numBytes);
+ } else {
+ resColumns.emplace_back(nullptr, 0);
+ }
}
+
+ return resColumns;
+}
+
+void TestTiersT1(bool reboots) {
+ std::vector<ui64> ts = {1600000000, 1620000000};
+ ui64 nowSec = TInstant::Now().Seconds();
+
+ TTestSchema::TTableSpecials spec;
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0"));
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1"));
+ spec.Tiers.back().SetCodec("zstd");
+
+ std::vector<TTestSchema::TTableSpecials> alters(4, spec);
+
+ ui64 allowBoth = nowSec - ts[0] + 600;
+ ui64 allowOne = nowSec - ts[1] + 600;
+ ui64 allowNone = nowSec - ts[1] - 600;
+
+ alters[0].Tiers[0].SetTtl(allowBoth); // tier0 allows/has: data[0], data[1]
+ alters[0].Tiers[1].SetTtl(allowBoth); // tier1 allows: data[0], data[1], has: nothing
+
+ alters[1].Tiers[0].SetTtl(allowOne); // tier0 allows/has: data[1]
+ alters[1].Tiers[1].SetTtl(allowBoth); // tier1 allows: data[0], data[1], has: data[0]
+
+ alters[2].Tiers[0].SetTtl(allowNone); // tier0 allows/has: nothing
+ alters[2].Tiers[1].SetTtl(allowOne); // tier1 allows/has: data[1]
+
+ alters[3].Tiers[0].SetTtl(allowNone); // tier0 allows/has: nothing
+ alters[3].Tiers[1].SetTtl(allowNone); // tier1 allows/has: nothing
+
+ ui32 portionSize = 80 * 1000;
+ ui32 overlapSize = 40 * 1000;
+ std::vector<TString> blobs = MakeData(ts, portionSize, overlapSize);
+
+ auto columns = TestTiers(reboots, blobs, alters);
+
+ UNIT_ASSERT_EQUAL(columns.size(), 4);
+ UNIT_ASSERT(columns[0].first);
+ UNIT_ASSERT(columns[1].first);
+ UNIT_ASSERT(columns[2].first);
+ UNIT_ASSERT(!columns[3].first);
+ UNIT_ASSERT(columns[0].second);
+ UNIT_ASSERT(columns[1].second);
+ UNIT_ASSERT(columns[2].second);
+ UNIT_ASSERT(!columns[3].second);
+
+ UNIT_ASSERT_EQUAL(columns[0].first->length(), 2 * portionSize - overlapSize);
+ UNIT_ASSERT_EQUAL(columns[0].first->length(), columns[1].first->length());
+ UNIT_ASSERT_EQUAL(columns[2].first->length(), portionSize);
+
+ Cerr << "read bytes: " << columns[0].second << ", " << columns[1].second << ", " << columns[2].second << "\n";
+ UNIT_ASSERT_GT(columns[0].second, columns[1].second);
}
void TestDrop(bool reboots) {
@@ -380,6 +536,15 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
TestTtl(true, false, specs);
}
+ Y_UNIT_TEST(Tiers) {
+ TestTiersT1(false);
+ }
+
+ Y_UNIT_TEST(RebootTiers) {
+ NColumnShard::gAllowLogBatchingDefaultValue = false;
+ TestTiersT1(true);
+ }
+
Y_UNIT_TEST(Drop) {
TestDrop(false);
}
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index a02d84e73f..1a96ba7b44 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -213,7 +213,17 @@ public:
TString accumulatedBlob;
TVector<std::pair<size_t, TString>> recordsInBlob;
- for (auto& portionInfo : indexChanges->AppendedPortions) {
+ size_t portionsToWrite = indexChanges->AppendedPortions.size();
+ bool appended = true;
+ if (indexChanges->PortionsToEvict.size()) {
+ Y_VERIFY(portionsToWrite == 0);
+ portionsToWrite = indexChanges->PortionsToEvict.size();
+ appended = false;
+ }
+
+ for (size_t pos = 0; pos < portionsToWrite; ++pos) {
+ auto& portionInfo = appended ? indexChanges->AppendedPortions[pos]
+ : indexChanges->PortionsToEvict[pos].first;
auto& records = portionInfo.Records;
accumulatedBlob.clear();
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index c4c857fe7d..e1a38023b3 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -23,6 +23,7 @@ SRCS(
columnshard_impl.cpp
columnshard_common.cpp
compaction_actor.cpp
+ eviction_actor.cpp
indexing_actor.cpp
read_actor.cpp
write_actor.cpp
diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp
index b3aea0b90f..2a712d3b0f 100644
--- a/ydb/core/tx/schemeshard/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap.cpp
@@ -538,4 +538,9 @@ Y_UNIT_TEST_SUITE(TOlap) {
}
)", {NKikimrScheme::StatusInvalidParameter});
}
+
+ // TODO: AlterTiers
+ // negatives for store: disallow alters
+ // negatives for table: wrong tiers count, wrong tiers, wrong eviction column, wrong eviction values,
+ // different TTL columns in tiers
}