aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-05 15:40:19 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-05 15:40:19 +0300
commit425a6cff6fa3ac1cabb21913ce488cf2001756a2 (patch)
tree927920b4600bfe542c5723e2eee34d813a9ff3ce
parent673cd6cf5505d2db27f94c86525520eabcd8ffac (diff)
downloadydb-425a6cff6fa3ac1cabb21913ce488cf2001756a2.tar.gz
additional signals, logs and test
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp11
-rw-r--r--ydb/core/testlib/common_helper.cpp54
-rw-r--r--ydb/core/testlib/common_helper.h1
-rw-r--r--ydb/core/testlib/cs_helper.cpp16
-rw-r--r--ydb/core/testlib/cs_helper.h10
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp2
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp6
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp338
9 files changed, 419 insertions, 20 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 9ef0d56c80..21fc452453 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -796,14 +796,19 @@ bool ArrayScalarsEqual(const std::shared_ptr<arrow::Array>& lhs, const std::shar
}
bool ReserveData(arrow::ArrayBuilder& builder, const size_t size) {
+ arrow::Status result = arrow::Status::OK();
if (builder.type()->id() == arrow::Type::BINARY) {
arrow::BaseBinaryBuilder<arrow::BinaryType>& bBuilder = static_cast<arrow::BaseBinaryBuilder<arrow::BinaryType>&>(builder);
- return bBuilder.ReserveData(size).ok();
+ result = bBuilder.ReserveData(size);
} else if (builder.type()->id() == arrow::Type::STRING) {
arrow::BaseBinaryBuilder<arrow::StringType>& bBuilder = static_cast<arrow::BaseBinaryBuilder<arrow::StringType>&>(builder);
- return bBuilder.ReserveData(size).ok();
+ result = bBuilder.ReserveData(size);
}
- return true;
+
+ if (!result.ok()) {
+ AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "ReserveData")("error", result.ToString());
+ }
+ return result.ok();
}
bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result,
diff --git a/ydb/core/testlib/common_helper.cpp b/ydb/core/testlib/common_helper.cpp
index 8b285542af..3274d9b15a 100644
--- a/ydb/core/testlib/common_helper.cpp
+++ b/ydb/core/testlib/common_helper.cpp
@@ -20,6 +20,60 @@ void THelper::WaitForSchemeOperation(TActorId sender, ui64 txId) {
runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(sender);
}
+void THelper::StartScanRequest(const TString& request, const bool expectSuccess, TVector<THashMap<TString, NYdb::TValue>>* result) const {
+ NYdb::NTable::TTableClient tClient(Server.GetDriver(),
+ NYdb::NTable::TClientSettings().UseQueryCache(false).AuthToken("root@builtin"));
+ auto expectation = expectSuccess;
+ bool resultReady = false;
+ TVector<THashMap<TString, NYdb::TValue>> rows;
+ std::optional<NYdb::NTable::TScanQueryPartIterator> scanIterator;
+ tClient.StreamExecuteScanQuery(request).Subscribe([&scanIterator](NThreading::TFuture<NYdb::NTable::TScanQueryPartIterator> f) {
+ scanIterator = f.GetValueSync();
+ });
+ const TInstant start = TInstant::Now();
+ while (!resultReady && start + TDuration::Seconds(60) > TInstant::Now()) {
+ Server.GetRuntime()->SimulateSleep(TDuration::Seconds(1));
+ if (scanIterator && !resultReady) {
+ scanIterator->ReadNext().Subscribe([&](NThreading::TFuture<NYdb::NTable::TScanQueryPart> streamPartFuture) {
+ NYdb::NTable::TScanQueryPart streamPart = streamPartFuture.GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ resultReady = true;
+ UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ } else {
+ UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(), "Unexpected empty scan query response.");
+
+ if (streamPart.HasQueryStats()) {
+ auto plan = streamPart.GetQueryStats().GetPlan();
+ NJson::TJsonValue jsonValue;
+ if (plan) {
+ UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, &jsonValue));
+ Cerr << jsonValue << Endl;
+ }
+ }
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+ NYdb::TResultSetParser rsParser(resultSet);
+ while (rsParser.TryNextRow()) {
+ THashMap<TString, NYdb::TValue> row;
+ for (size_t ci = 0; ci < resultSet.ColumnsCount(); ++ci) {
+ row.emplace(resultSet.GetColumnsMeta()[ci].Name, rsParser.GetValue(ci));
+ Cerr << resultSet.GetColumnsMeta()[ci].Name << "/" << rsParser.GetValue(ci).GetProto().DebugString() << Endl;
+ }
+ rows.emplace_back(std::move(row));
+ }
+ }
+ }
+ });
+ }
+ }
+ Cerr << "REQUEST=" << request << ";EXPECTATION=" << expectation << Endl;
+ UNIT_ASSERT(resultReady);
+ if (result) {
+ *result = rows;
+ }
+}
+
void THelper::StartDataRequest(const TString& request, const bool expectSuccess, TString* result) const {
NYdb::NTable::TTableClient tClient(Server.GetDriver(),
NYdb::NTable::TClientSettings().UseQueryCache(false).AuthToken("root@builtin"));
diff --git a/ydb/core/testlib/common_helper.h b/ydb/core/testlib/common_helper.h
index 0abc659b97..38d66ee2ea 100644
--- a/ydb/core/testlib/common_helper.h
+++ b/ydb/core/testlib/common_helper.h
@@ -19,6 +19,7 @@ public:
void DropTable(const TString& tablePath);
+ void StartScanRequest(const TString& request, const bool expectSuccess, TVector<THashMap<TString, NYdb::TValue>>* result) const;
void StartDataRequest(const TString& request, const bool expectSuccess = true, TString* result = nullptr) const;
void StartSchemaRequest(const TString& request, const bool expectSuccess = true, const bool waiting = true) const;
};
diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp
index abf45d13ec..b090fd6a4f 100644
--- a/ydb/core/testlib/cs_helper.cpp
+++ b/ydb/core/testlib/cs_helper.cpp
@@ -98,8 +98,8 @@ void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_pt
runtime->DispatchEvents(options);
}
-void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const {
- auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount);
+void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs) const {
+ auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount, tsStepUs);
SendDataViaActorSystem(testTable, batch);
}
@@ -118,7 +118,7 @@ std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() const {
return std::make_shared<arrow::Schema>(std::move(fields));
}
-std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const {
+std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs) const {
std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool());
@@ -136,7 +136,7 @@ std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui
for (size_t i = 0; i < rowCount; ++i) {
std::string uid("uid_" + std::to_string(tsBegin + i));
std::string message("some prefix " + std::string(1024 + i % 200, 'x'));
- Y_VERIFY(b1.Append(tsBegin + i).ok());
+ Y_VERIFY(b1.Append(tsBegin + i * tsStepUs).ok());
Y_VERIFY(b2.Append(std::to_string(pathIdBegin + i)).ok());
Y_VERIFY(b3.Append(uid).ok());
Y_VERIFY(b4.Append(i % 5).ok());
@@ -325,7 +325,7 @@ std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() const {
});
}
-std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount) const {
+std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount, const ui32 tsStepUs) const {
std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
UNIT_ASSERT(schema);
UNIT_ASSERT(schema->num_fields());
@@ -353,7 +353,7 @@ std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64
break;
}
case arrow::Type::TIMESTAMP: {
- UNIT_ASSERT(builders->GetFieldAs<arrow::TimestampBuilder>(col)->Append(value).ok());
+ UNIT_ASSERT(builders->GetFieldAs<arrow::TimestampBuilder>(col)->Append(begin + row * tsStepUs).ok());
break;
}
case arrow::Type::BINARY: {
@@ -395,10 +395,10 @@ std::shared_ptr<arrow::Schema> TTableWithNullsHelper::GetArrowSchema() const {
}
std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch() const {
- return TestArrowBatch(0, 0, 10);
+ return TestArrowBatch(0, 0, 10, 1);
}
-std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64, ui64, size_t rowCount) const {
+std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64, ui64, size_t rowCount, const ui32 /*tsStepUs*/) const {
rowCount = 10;
std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h
index 0ed78d368f..2548e9905a 100644
--- a/ydb/core/testlib/cs_helper.h
+++ b/ydb/core/testlib/cs_helper.h
@@ -14,10 +14,10 @@ public:
using TBase::TBase;
void CreateTestOlapStore(TActorId sender, TString scheme);
void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme);
- void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const;
+ void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const;
void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch) const;
- virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const = 0;
+ virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const = 0;
};
class THelper: public THelperSchemaless {
@@ -59,7 +59,7 @@ public:
}
virtual TString GetTestTableSchema() const;
- virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const override;
+ virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const override;
};
class TCickBenchHelper: public THelperSchemaless {
@@ -180,7 +180,7 @@ public:
KeyColumnNames: ["EventTime", "EventDate", "CounterID", "UserID", "WatchID"]
)";
- std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount) const override;
+ std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount, const ui32 tsStepUs = 1) const override;
};
class TTableWithNullsHelper: public THelperSchemaless {
@@ -201,7 +201,7 @@ public:
KeyColumnNames: "id"
)";
- std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64, size_t rowCount = 10) const override;
+ std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64, size_t rowCount = 10, const ui32 tsStepUs = 1) const override;
std::shared_ptr<arrow::RecordBatch> TestArrowBatch() const;
};
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 3982f4318c..196a5b4df3 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -220,6 +220,7 @@ private:
};
void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "EnqueueProgressTx")("tablet_id", TabletID());
if (!ProgressTxInFlight) {
ProgressTxInFlight = true;
Execute(new TTxProgressTx(this), ctx);
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 929ddf1be5..a11ee3c94f 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -2735,7 +2735,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 planStep = 5000000;
ui64 txId = 1000;
- // Ovewrite the same data multiple times to produce multiple portions at different timestamps
+ // Overwrite the same data multiple times to produce multiple portions at different timestamps
ui32 numWrites = 14;
for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) {
UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index 986a9d395c..74c452a487 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -223,7 +223,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
UNIT_ASSERT(ts.size() == 2);
- ui32 ttlSec = TInstant::Now().Seconds(); // disable internal tll
+ ui32 ttlSec = TAppData::TimeProvider->Now().Seconds(); // disable internal tll
if (internal) {
ttlSec -= (ts[0] + ts[1]) / 2; // enable internal ttl between ts1 and ts2
}
@@ -288,7 +288,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}
// Alter TTL
- ttlSec = TInstant::Now().Seconds() - (ts[1] + 1);
+ ttlSec = TAppData::TimeProvider->Now().Seconds() - (ts[1] + 1);
if (spec.HasTiers()) {
spec.Tiers[0].EvictAfter = TDuration::Seconds(ttlSec);
} else {
@@ -743,7 +743,7 @@ std::vector<std::pair<ui32, ui64>> TestTiersAndTtl(const TTestSchema::TTableSpec
blobs.emplace_back(std::move(preload[1]));
}
- TInstant now = TInstant::Now();
+ TInstant now = TAppData::TimeProvider->Now();
TDuration allowBoth = TDuration::Seconds(now.Seconds() - ts[0] + 600);
TDuration allowOne = TDuration::Seconds(now.Seconds() - ts[1] + 600);
TDuration allowNone = TDuration::Seconds(now.Seconds() - ts[1] - 600);
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index 33225ffb37..ebd5de2c4a 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -63,6 +63,45 @@ public:
}
)", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
}
+
+ void CreateTestOlapTableWithTTL(TString tableName = "olapTable", ui32 tableShardsCount = 3,
+ TString storeName = "olapStore", ui32 storeShardsCount = 4,
+ TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") {
+
+ TActorId sender = Server.GetRuntime()->AllocateEdgeActor();
+ CreateTestOlapStore(sender, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ SchemaPresets {
+ Name: "default"
+ Schema {
+ %s
+ }
+ }
+ )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data()));
+
+ TString shardingColumns = "[\"timestamp\", \"uid\"]";
+ if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
+ shardingColumns = "[\"uid\"]";
+ }
+
+ TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ TtlSettings: {
+ Enabled: {
+ ColumnName : "timestamp"
+ ExpireAfterSeconds : 86400
+ }
+ }
+ Sharding {
+ HashSharding {
+ Function: %s
+ Columns: %s
+ }
+ }
+ )", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
+ }
};
@@ -551,5 +590,304 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
#endif
}
+ std::optional<NYdb::TValue> GetValueResult(const THashMap<TString, NYdb::TValue>& hMap, const TString& fName) {
+ auto it = hMap.find(fName);
+ if (it == hMap.end()) {
+ Cerr << fName << ": NOT_FOUND" << Endl;
+ return {};
+ } else {
+ Cerr << fName << ": " << it->second.GetProto().DebugString() << Endl;
+ return it->second;
+ }
+ }
+
+ class TGCSource {
+ private:
+ const ui64 TabletId;
+ const ui32 Channel;
+ public:
+ ui32 GetChannel() const {
+ return Channel;
+ }
+ TGCSource(const ui64 tabletId, const ui32 channel)
+ : TabletId(tabletId)
+ , Channel(channel)
+ {
+
+ }
+
+ bool operator<(const TGCSource& item) const {
+ return std::tie(TabletId, Channel) < std::tie(item.TabletId, item.Channel);
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() << "tId=" << TabletId << ";c=" << Channel << ";";
+ }
+ };
+
+ class TCurrentBarrier {
+ private:
+ ui32 Generation = 0;
+ ui32 Step = 0;
+ public:
+ TCurrentBarrier() = default;
+
+ TCurrentBarrier(const ui32 gen, const ui32 step)
+ : Generation(gen)
+ , Step(step)
+ {
+
+ }
+
+ bool operator<(const TCurrentBarrier& b) const {
+ return std::tie(Generation, Step) < std::tie(b.Generation, b.Step);
+ }
+
+ bool IsDeprecated(const NKikimr::TLogoBlobID& id) const {
+ if (id.Generation() < Generation) {
+ return true;
+ }
+ if (id.Generation() > Generation) {
+ return false;
+ }
+
+ if (id.Step() < Step) {
+ return true;
+ }
+ return id.Generation() == Generation && id.Step() == Step;
+ }
+ };
+
+ class TBlobFlags {
+ private:
+ bool KeepFlag = false;
+ bool DontKeepFlag = false;
+ public:
+ bool IsRemovable() const {
+ return !KeepFlag || DontKeepFlag;
+ }
+ void Keep() {
+ KeepFlag = true;
+ }
+ void DontKeep() {
+ DontKeepFlag = true;
+ }
+ };
+
+ class TGCSourceData {
+ private:
+ i64 BytesSize = 0;
+ TCurrentBarrier Barrier;
+ std::map<NKikimr::TLogoBlobID, TBlobFlags> Blobs;
+ public:
+
+ TString DebugString() const {
+ return TStringBuilder() << "size=" << BytesSize << ";count=" << Blobs.size() << ";";
+ }
+
+ i64 GetSize() const {
+ return BytesSize;
+ }
+ void AddSize(const ui64 size) {
+ BytesSize += size;
+ }
+ void ReduceSize(const ui64 size) {
+ BytesSize -= size;
+ Y_VERIFY(BytesSize >= 0);
+ }
+ void SetBarrier(const TCurrentBarrier& b) {
+ Y_VERIFY(!(b < Barrier));
+ Barrier = b;
+ RefreshBarrier();
+ }
+
+ void AddKeep(const TLogoBlobID& id) {
+ auto it = Blobs.find(id);
+ if (it != Blobs.end()) {
+ it->second.Keep();
+ }
+ }
+
+ void AddDontKeep(const TLogoBlobID& id) {
+ auto it = Blobs.find(id);
+ if (it != Blobs.end()) {
+ it->second.DontKeep();
+ }
+ }
+
+ void AddBlob(const TLogoBlobID& id) {
+ Blobs[id] = TBlobFlags();
+ }
+
+ void RefreshBarrier() {
+ for (auto it = Blobs.begin(); it != Blobs.end();) {
+ if (Barrier.IsDeprecated(it->first) && it->second.IsRemovable()) {
+ ReduceSize(it->first.BlobSize());
+ it = Blobs.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+ };
+
+ class TBSDataCollector {
+ private:
+ std::map<TGCSource, TGCSourceData> Data;
+ public:
+ TGCSourceData& GetData(const TGCSource& id) {
+ return Data[id];
+ }
+ ui64 GetChannelSize(const ui32 channelId) const {
+ ui64 result = 0;
+ for (auto&& i : Data) {
+ if (i.first.GetChannel() == channelId) {
+ result += i.second.GetSize();
+ }
+ }
+ return result;
+ }
+ ui64 GetSize() const {
+ ui64 result = 0;
+ for (auto&& i : Data) {
+ result += i.second.GetSize();
+ }
+ return result;
+ }
+ TString StatusString() const {
+ std::map<ui32, TString> info;
+ for (auto&& i : Data) {
+ info[i.first.GetChannel()] += i.second.DebugString();
+ }
+ TStringBuilder sb;
+ for (auto&& i : info) {
+ sb << i.first << ":" << i.second << ";";
+ }
+ return sb;
+ }
+
+ };
+
+ Y_UNIT_TEST(TTLUsage) {
+ TPortManager pm;
+
+ ui32 grpcPort = pm.GetPort();
+ ui32 msgbPort = pm.GetPort();
+
+ Tests::TServerSettings serverSettings(msgbPort);
+ serverSettings.Port = msgbPort;
+ serverSettings.GrpcPort = grpcPort;
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMetadataProvider(true)
+ .SetEnableBackgroundTasks(true)
+ .SetForceColumnTablesCompositeMarks(true);
+ ;
+
+ Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
+ server->EnableGRpc(grpcPort);
+ Tests::TClient client(serverSettings);
+
+ auto& runtime = *server->GetRuntime();
+// runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
+// runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+// runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
+ runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_DEBUG);
+// runtime.SetLogPriority(NKikimrServices::BG_TASKS, NLog::PRI_DEBUG);
+ // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
+
+ TLocalHelper lHelper(*server);
+ lHelper.CreateTestOlapTableWithTTL("olapTable", 1);
+ Cerr << "Wait tables" << Endl;
+ runtime.SimulateSleep(TDuration::Seconds(20));
+ Cerr << "Initialization tables" << Endl;
+ const ui32 numRecords = 600000;
+ auto batch = lHelper.TestArrowBatch(0, TInstant::Zero().GetValue(), 600000, 1000000);
+
+ ui32 gcCounter = 0;
+ TBSDataCollector bsCollector;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (auto* msg = dynamic_cast<TEvBlobStorage::TEvCollectGarbageResult*>(ev->StaticCastAsLocal<IEventBase>())) {
+ Y_VERIFY(msg->Status == NKikimrProto::EReplyStatus::OK);
+ }
+ if (auto* msg = dynamic_cast<TEvBlobStorage::TEvCollectGarbage*>(ev->StaticCastAsLocal<IEventBase>())) {
+ TGCSource gcSource(msg->TabletId, msg->Channel);
+ auto& gcSourceData = bsCollector.GetData(gcSource);
+ if (msg->Keep) {
+ for (auto&& i : *msg->Keep) {
+ gcSourceData.AddKeep(i);
+ }
+ }
+ if (msg->DoNotKeep) {
+ for (auto&& i : *msg->DoNotKeep) {
+ gcSourceData.AddDontKeep(i);
+ }
+ }
+
+ Y_VERIFY(!msg->Hard);
+ if (msg->Collect) {
+ gcSourceData.SetBarrier(TCurrentBarrier(msg->CollectGeneration, msg->CollectStep));
+ } else {
+ gcSourceData.RefreshBarrier();
+ }
+ Cerr << "TEvBlobStorage::TEvCollectGarbage " << gcSource.DebugString() << ":" << ++gcCounter << "/" << bsCollector.StatusString() << Endl;
+ }
+ if (auto* msg = dynamic_cast<TEvBlobStorage::TEvPut*>(ev->StaticCastAsLocal<IEventBase>())) {
+ TGCSource gcSource(msg->Id.TabletID(), msg->Id.Channel());
+ auto& gcSourceData = bsCollector.GetData(gcSource);
+ gcSourceData.AddBlob(msg->Id);
+ gcSourceData.AddSize(msg->Id.BlobSize());
+ Cerr << "TEvBlobStorage::TEvPut " << gcSource.DebugString() << ":" << gcCounter << "/" << bsCollector.StatusString() << Endl;
+ }
+ return false;
+ };
+ runtime.SetEventFilter(captureEvents);
+
+ lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", batch);
+
+ {
+ TVector<THashMap<TString, NYdb::TValue>> result;
+ lHelper.StartScanRequest("SELECT MAX(timestamp) as a, MIN(timestamp) as b, COUNT(*) as c FROM `/Root/olapStore/olapTable`", true, &result);
+ UNIT_ASSERT(result.size() == 1);
+ UNIT_ASSERT(result.front().size() == 3);
+ UNIT_ASSERT(GetValueResult(result.front(), "c")->GetProto().uint64_value() == 600000);
+ UNIT_ASSERT(GetValueResult(result.front(), "a")->GetProto().uint64_value() == 599999000000);
+ UNIT_ASSERT(GetValueResult(result.front(), "b")->GetProto().uint64_value() == 0);
+ }
+ const ui32 reduceStepsCount = 1;
+ for (ui32 i = 0; i < reduceStepsCount; ++i) {
+ runtime.AdvanceCurrentTime(TDuration::Seconds(numRecords * (i + 1) / reduceStepsCount + 500000));
+ const TInstant start = TInstant::Now();
+ const ui64 purposeSize = 800000000.0 * (1 - 1.0 * (i + 1) / reduceStepsCount);
+ const ui64 purposeRecords = numRecords * (1 - 1.0 * (i + 1) / reduceStepsCount);
+ const ui64 purposeMinTimestamp = numRecords * 1.0 * (i + 1) / reduceStepsCount * 1000000;
+ while (bsCollector.GetChannelSize(2) > purposeSize && TInstant::Now() - start < TDuration::Seconds(60)) {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ Cerr << bsCollector.GetChannelSize(2) << "/" << purposeSize << Endl;
+
+ TVector<THashMap<TString, NYdb::TValue>> result;
+ lHelper.StartScanRequest("SELECT MIN(timestamp) as b, COUNT(*) as c FROM `/Root/olapStore/olapTable`", true, &result);
+ UNIT_ASSERT(result.size() == 1);
+ UNIT_ASSERT(result.front().size() == 2);
+ UNIT_ASSERT(GetValueResult(result.front(), "c")->GetProto().uint64_value() == purposeRecords);
+ if (purposeRecords) {
+ UNIT_ASSERT(GetValueResult(result.front(), "b")->GetProto().uint64_value() == purposeMinTimestamp);
+ }
+
+ Y_VERIFY(bsCollector.GetChannelSize(2) <= purposeSize);
+ }
+
+ {
+ TVector<THashMap<TString, NYdb::TValue>> result;
+ lHelper.StartScanRequest("SELECT COUNT(*) FROM `/Root/olapStore/olapTable`", true, &result);
+ UNIT_ASSERT(result.front().begin()->second.GetProto().uint64_value() == 0);
+ }
+ }
+
}
}