diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-05 15:40:19 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-05 15:40:19 +0300 |
commit | 425a6cff6fa3ac1cabb21913ce488cf2001756a2 (patch) | |
tree | 927920b4600bfe542c5723e2eee34d813a9ff3ce | |
parent | 673cd6cf5505d2db27f94c86525520eabcd8ffac (diff) | |
download | ydb-425a6cff6fa3ac1cabb21913ce488cf2001756a2.tar.gz |
additional signals, logs and test
-rw-r--r-- | ydb/core/formats/arrow/arrow_helpers.cpp | 11 | ||||
-rw-r--r-- | ydb/core/testlib/common_helper.cpp | 54 | ||||
-rw-r--r-- | ydb/core/testlib/common_helper.h | 1 | ||||
-rw-r--r-- | ydb/core/testlib/cs_helper.cpp | 16 | ||||
-rw-r--r-- | ydb/core/testlib/cs_helper.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__progress_tx.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/tiering/ut/ut_tiers.cpp | 338 |
9 files changed, 419 insertions, 20 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 9ef0d56c80..21fc452453 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -796,14 +796,19 @@ bool ArrayScalarsEqual(const std::shared_ptr<arrow::Array>& lhs, const std::shar } bool ReserveData(arrow::ArrayBuilder& builder, const size_t size) { + arrow::Status result = arrow::Status::OK(); if (builder.type()->id() == arrow::Type::BINARY) { arrow::BaseBinaryBuilder<arrow::BinaryType>& bBuilder = static_cast<arrow::BaseBinaryBuilder<arrow::BinaryType>&>(builder); - return bBuilder.ReserveData(size).ok(); + result = bBuilder.ReserveData(size); } else if (builder.type()->id() == arrow::Type::STRING) { arrow::BaseBinaryBuilder<arrow::StringType>& bBuilder = static_cast<arrow::BaseBinaryBuilder<arrow::StringType>&>(builder); - return bBuilder.ReserveData(size).ok(); + result = bBuilder.ReserveData(size); } - return true; + + if (!result.ok()) { + AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "ReserveData")("error", result.ToString()); + } + return result.ok(); } bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, diff --git a/ydb/core/testlib/common_helper.cpp b/ydb/core/testlib/common_helper.cpp index 8b285542af..3274d9b15a 100644 --- a/ydb/core/testlib/common_helper.cpp +++ b/ydb/core/testlib/common_helper.cpp @@ -20,6 +20,60 @@ void THelper::WaitForSchemeOperation(TActorId sender, ui64 txId) { runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(sender); } +void THelper::StartScanRequest(const TString& request, const bool expectSuccess, TVector<THashMap<TString, NYdb::TValue>>* result) const { + NYdb::NTable::TTableClient tClient(Server.GetDriver(), + NYdb::NTable::TClientSettings().UseQueryCache(false).AuthToken("root@builtin")); + auto expectation = expectSuccess; + bool resultReady = false; + TVector<THashMap<TString, NYdb::TValue>> rows; + std::optional<NYdb::NTable::TScanQueryPartIterator> scanIterator; + tClient.StreamExecuteScanQuery(request).Subscribe([&scanIterator](NThreading::TFuture<NYdb::NTable::TScanQueryPartIterator> f) { + scanIterator = f.GetValueSync(); + }); + const TInstant start = TInstant::Now(); + while (!resultReady && start + TDuration::Seconds(60) > TInstant::Now()) { + Server.GetRuntime()->SimulateSleep(TDuration::Seconds(1)); + if (scanIterator && !resultReady) { + scanIterator->ReadNext().Subscribe([&](NThreading::TFuture<NYdb::NTable::TScanQueryPart> streamPartFuture) { + NYdb::NTable::TScanQueryPart streamPart = streamPartFuture.GetValueSync(); + if (!streamPart.IsSuccess()) { + resultReady = true; + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + } else { + UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(), "Unexpected empty scan query response."); + + if (streamPart.HasQueryStats()) { + auto plan = streamPart.GetQueryStats().GetPlan(); + NJson::TJsonValue jsonValue; + if (plan) { + UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, &jsonValue)); + Cerr << jsonValue << Endl; + } + } + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + NYdb::TResultSetParser rsParser(resultSet); + while (rsParser.TryNextRow()) { + THashMap<TString, NYdb::TValue> row; + for (size_t ci = 0; ci < resultSet.ColumnsCount(); ++ci) { + row.emplace(resultSet.GetColumnsMeta()[ci].Name, rsParser.GetValue(ci)); + Cerr << resultSet.GetColumnsMeta()[ci].Name << "/" << rsParser.GetValue(ci).GetProto().DebugString() << Endl; + } + rows.emplace_back(std::move(row)); + } + } + } + }); + } + } + Cerr << "REQUEST=" << request << ";EXPECTATION=" << expectation << Endl; + UNIT_ASSERT(resultReady); + if (result) { + *result = rows; + } +} + void THelper::StartDataRequest(const TString& request, const bool expectSuccess, TString* result) const { NYdb::NTable::TTableClient tClient(Server.GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false).AuthToken("root@builtin")); diff --git a/ydb/core/testlib/common_helper.h b/ydb/core/testlib/common_helper.h index 0abc659b97..38d66ee2ea 100644 --- a/ydb/core/testlib/common_helper.h +++ b/ydb/core/testlib/common_helper.h @@ -19,6 +19,7 @@ public: void DropTable(const TString& tablePath); + void StartScanRequest(const TString& request, const bool expectSuccess, TVector<THashMap<TString, NYdb::TValue>>* result) const; void StartDataRequest(const TString& request, const bool expectSuccess = true, TString* result = nullptr) const; void StartSchemaRequest(const TString& request, const bool expectSuccess = true, const bool waiting = true) const; }; diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index abf45d13ec..b090fd6a4f 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -98,8 +98,8 @@ void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_pt runtime->DispatchEvents(options); } -void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const { - auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount); +void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs) const { + auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount, tsStepUs); SendDataViaActorSystem(testTable, batch); } @@ -118,7 +118,7 @@ std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() const { return std::make_shared<arrow::Schema>(std::move(fields)); } -std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const { +std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs) const { std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool()); @@ -136,7 +136,7 @@ std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui for (size_t i = 0; i < rowCount; ++i) { std::string uid("uid_" + std::to_string(tsBegin + i)); std::string message("some prefix " + std::string(1024 + i % 200, 'x')); - Y_VERIFY(b1.Append(tsBegin + i).ok()); + Y_VERIFY(b1.Append(tsBegin + i * tsStepUs).ok()); Y_VERIFY(b2.Append(std::to_string(pathIdBegin + i)).ok()); Y_VERIFY(b3.Append(uid).ok()); Y_VERIFY(b4.Append(i % 5).ok()); @@ -325,7 +325,7 @@ std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() const { }); } -std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount) const { +std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount, const ui32 tsStepUs) const { std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); UNIT_ASSERT(schema); UNIT_ASSERT(schema->num_fields()); @@ -353,7 +353,7 @@ std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 break; } case arrow::Type::TIMESTAMP: { - UNIT_ASSERT(builders->GetFieldAs<arrow::TimestampBuilder>(col)->Append(value).ok()); + UNIT_ASSERT(builders->GetFieldAs<arrow::TimestampBuilder>(col)->Append(begin + row * tsStepUs).ok()); break; } case arrow::Type::BINARY: { @@ -395,10 +395,10 @@ std::shared_ptr<arrow::Schema> TTableWithNullsHelper::GetArrowSchema() const { } std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch() const { - return TestArrowBatch(0, 0, 10); + return TestArrowBatch(0, 0, 10, 1); } -std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64, ui64, size_t rowCount) const { +std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64, ui64, size_t rowCount, const ui32 /*tsStepUs*/) const { rowCount = 10; std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h index 0ed78d368f..2548e9905a 100644 --- a/ydb/core/testlib/cs_helper.h +++ b/ydb/core/testlib/cs_helper.h @@ -14,10 +14,10 @@ public: using TBase::TBase; void CreateTestOlapStore(TActorId sender, TString scheme); void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme); - void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const; + void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const; void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch) const; - virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const = 0; + virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const = 0; }; class THelper: public THelperSchemaless { @@ -59,7 +59,7 @@ public: } virtual TString GetTestTableSchema() const; - virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const override; + virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const override; }; class TCickBenchHelper: public THelperSchemaless { @@ -180,7 +180,7 @@ public: KeyColumnNames: ["EventTime", "EventDate", "CounterID", "UserID", "WatchID"] )"; - std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount) const override; + std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount, const ui32 tsStepUs = 1) const override; }; class TTableWithNullsHelper: public THelperSchemaless { @@ -201,7 +201,7 @@ public: KeyColumnNames: "id" )"; - std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64, size_t rowCount = 10) const override; + std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64, size_t rowCount = 10, const ui32 tsStepUs = 1) const override; std::shared_ptr<arrow::RecordBatch> TestArrowBatch() const; }; diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 3982f4318c..196a5b4df3 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -220,6 +220,7 @@ private: }; void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "EnqueueProgressTx")("tablet_id", TabletID()); if (!ProgressTxInFlight) { ProgressTxInFlight = true; Execute(new TTxProgressTx(this), ctx); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 929ddf1be5..a11ee3c94f 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -2735,7 +2735,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 planStep = 5000000; ui64 txId = 1000; - // Ovewrite the same data multiple times to produce multiple portions at different timestamps + // Overwrite the same data multiple times to produce multiple portions at different timestamps ui32 numWrites = 14; for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) { UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index 986a9d395c..74c452a487 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -223,7 +223,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, UNIT_ASSERT(ts.size() == 2); - ui32 ttlSec = TInstant::Now().Seconds(); // disable internal tll + ui32 ttlSec = TAppData::TimeProvider->Now().Seconds(); // disable internal tll if (internal) { ttlSec -= (ts[0] + ts[1]) / 2; // enable internal ttl between ts1 and ts2 } @@ -288,7 +288,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, } // Alter TTL - ttlSec = TInstant::Now().Seconds() - (ts[1] + 1); + ttlSec = TAppData::TimeProvider->Now().Seconds() - (ts[1] + 1); if (spec.HasTiers()) { spec.Tiers[0].EvictAfter = TDuration::Seconds(ttlSec); } else { @@ -743,7 +743,7 @@ std::vector<std::pair<ui32, ui64>> TestTiersAndTtl(const TTestSchema::TTableSpec blobs.emplace_back(std::move(preload[1])); } - TInstant now = TInstant::Now(); + TInstant now = TAppData::TimeProvider->Now(); TDuration allowBoth = TDuration::Seconds(now.Seconds() - ts[0] + 600); TDuration allowOne = TDuration::Seconds(now.Seconds() - ts[1] + 600); TDuration allowNone = TDuration::Seconds(now.Seconds() - ts[1] - 600); diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index 33225ffb37..ebd5de2c4a 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -63,6 +63,45 @@ public: } )", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str())); } + + void CreateTestOlapTableWithTTL(TString tableName = "olapTable", ui32 tableShardsCount = 3, + TString storeName = "olapStore", ui32 storeShardsCount = 4, + TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") { + + TActorId sender = Server.GetRuntime()->AllocateEdgeActor(); + CreateTestOlapStore(sender, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + SchemaPresets { + Name: "default" + Schema { + %s + } + } + )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data())); + + TString shardingColumns = "[\"timestamp\", \"uid\"]"; + if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { + shardingColumns = "[\"uid\"]"; + } + + TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + TtlSettings: { + Enabled: { + ColumnName : "timestamp" + ExpireAfterSeconds : 86400 + } + } + Sharding { + HashSharding { + Function: %s + Columns: %s + } + } + )", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str())); + } }; @@ -551,5 +590,304 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { #endif } + std::optional<NYdb::TValue> GetValueResult(const THashMap<TString, NYdb::TValue>& hMap, const TString& fName) { + auto it = hMap.find(fName); + if (it == hMap.end()) { + Cerr << fName << ": NOT_FOUND" << Endl; + return {}; + } else { + Cerr << fName << ": " << it->second.GetProto().DebugString() << Endl; + return it->second; + } + } + + class TGCSource { + private: + const ui64 TabletId; + const ui32 Channel; + public: + ui32 GetChannel() const { + return Channel; + } + TGCSource(const ui64 tabletId, const ui32 channel) + : TabletId(tabletId) + , Channel(channel) + { + + } + + bool operator<(const TGCSource& item) const { + return std::tie(TabletId, Channel) < std::tie(item.TabletId, item.Channel); + } + + TString DebugString() const { + return TStringBuilder() << "tId=" << TabletId << ";c=" << Channel << ";"; + } + }; + + class TCurrentBarrier { + private: + ui32 Generation = 0; + ui32 Step = 0; + public: + TCurrentBarrier() = default; + + TCurrentBarrier(const ui32 gen, const ui32 step) + : Generation(gen) + , Step(step) + { + + } + + bool operator<(const TCurrentBarrier& b) const { + return std::tie(Generation, Step) < std::tie(b.Generation, b.Step); + } + + bool IsDeprecated(const NKikimr::TLogoBlobID& id) const { + if (id.Generation() < Generation) { + return true; + } + if (id.Generation() > Generation) { + return false; + } + + if (id.Step() < Step) { + return true; + } + return id.Generation() == Generation && id.Step() == Step; + } + }; + + class TBlobFlags { + private: + bool KeepFlag = false; + bool DontKeepFlag = false; + public: + bool IsRemovable() const { + return !KeepFlag || DontKeepFlag; + } + void Keep() { + KeepFlag = true; + } + void DontKeep() { + DontKeepFlag = true; + } + }; + + class TGCSourceData { + private: + i64 BytesSize = 0; + TCurrentBarrier Barrier; + std::map<NKikimr::TLogoBlobID, TBlobFlags> Blobs; + public: + + TString DebugString() const { + return TStringBuilder() << "size=" << BytesSize << ";count=" << Blobs.size() << ";"; + } + + i64 GetSize() const { + return BytesSize; + } + void AddSize(const ui64 size) { + BytesSize += size; + } + void ReduceSize(const ui64 size) { + BytesSize -= size; + Y_VERIFY(BytesSize >= 0); + } + void SetBarrier(const TCurrentBarrier& b) { + Y_VERIFY(!(b < Barrier)); + Barrier = b; + RefreshBarrier(); + } + + void AddKeep(const TLogoBlobID& id) { + auto it = Blobs.find(id); + if (it != Blobs.end()) { + it->second.Keep(); + } + } + + void AddDontKeep(const TLogoBlobID& id) { + auto it = Blobs.find(id); + if (it != Blobs.end()) { + it->second.DontKeep(); + } + } + + void AddBlob(const TLogoBlobID& id) { + Blobs[id] = TBlobFlags(); + } + + void RefreshBarrier() { + for (auto it = Blobs.begin(); it != Blobs.end();) { + if (Barrier.IsDeprecated(it->first) && it->second.IsRemovable()) { + ReduceSize(it->first.BlobSize()); + it = Blobs.erase(it); + } else { + ++it; + } + } + } + }; + + class TBSDataCollector { + private: + std::map<TGCSource, TGCSourceData> Data; + public: + TGCSourceData& GetData(const TGCSource& id) { + return Data[id]; + } + ui64 GetChannelSize(const ui32 channelId) const { + ui64 result = 0; + for (auto&& i : Data) { + if (i.first.GetChannel() == channelId) { + result += i.second.GetSize(); + } + } + return result; + } + ui64 GetSize() const { + ui64 result = 0; + for (auto&& i : Data) { + result += i.second.GetSize(); + } + return result; + } + TString StatusString() const { + std::map<ui32, TString> info; + for (auto&& i : Data) { + info[i.first.GetChannel()] += i.second.DebugString(); + } + TStringBuilder sb; + for (auto&& i : info) { + sb << i.first << ":" << i.second << ";"; + } + return sb; + } + + }; + + Y_UNIT_TEST(TTLUsage) { + TPortManager pm; + + ui32 grpcPort = pm.GetPort(); + ui32 msgbPort = pm.GetPort(); + + Tests::TServerSettings serverSettings(msgbPort); + serverSettings.Port = msgbPort; + serverSettings.GrpcPort = grpcPort; + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMetadataProvider(true) + .SetEnableBackgroundTasks(true) + .SetForceColumnTablesCompositeMarks(true); + ; + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + server->EnableGRpc(grpcPort); + Tests::TClient client(serverSettings); + + auto& runtime = *server->GetRuntime(); +// runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); +// runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + +// runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE); + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_DEBUG); +// runtime.SetLogPriority(NKikimrServices::BG_TASKS, NLog::PRI_DEBUG); + // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG); + + TLocalHelper lHelper(*server); + lHelper.CreateTestOlapTableWithTTL("olapTable", 1); + Cerr << "Wait tables" << Endl; + runtime.SimulateSleep(TDuration::Seconds(20)); + Cerr << "Initialization tables" << Endl; + const ui32 numRecords = 600000; + auto batch = lHelper.TestArrowBatch(0, TInstant::Zero().GetValue(), 600000, 1000000); + + ui32 gcCounter = 0; + TBSDataCollector bsCollector; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (auto* msg = dynamic_cast<TEvBlobStorage::TEvCollectGarbageResult*>(ev->StaticCastAsLocal<IEventBase>())) { + Y_VERIFY(msg->Status == NKikimrProto::EReplyStatus::OK); + } + if (auto* msg = dynamic_cast<TEvBlobStorage::TEvCollectGarbage*>(ev->StaticCastAsLocal<IEventBase>())) { + TGCSource gcSource(msg->TabletId, msg->Channel); + auto& gcSourceData = bsCollector.GetData(gcSource); + if (msg->Keep) { + for (auto&& i : *msg->Keep) { + gcSourceData.AddKeep(i); + } + } + if (msg->DoNotKeep) { + for (auto&& i : *msg->DoNotKeep) { + gcSourceData.AddDontKeep(i); + } + } + + Y_VERIFY(!msg->Hard); + if (msg->Collect) { + gcSourceData.SetBarrier(TCurrentBarrier(msg->CollectGeneration, msg->CollectStep)); + } else { + gcSourceData.RefreshBarrier(); + } + Cerr << "TEvBlobStorage::TEvCollectGarbage " << gcSource.DebugString() << ":" << ++gcCounter << "/" << bsCollector.StatusString() << Endl; + } + if (auto* msg = dynamic_cast<TEvBlobStorage::TEvPut*>(ev->StaticCastAsLocal<IEventBase>())) { + TGCSource gcSource(msg->Id.TabletID(), msg->Id.Channel()); + auto& gcSourceData = bsCollector.GetData(gcSource); + gcSourceData.AddBlob(msg->Id); + gcSourceData.AddSize(msg->Id.BlobSize()); + Cerr << "TEvBlobStorage::TEvPut " << gcSource.DebugString() << ":" << gcCounter << "/" << bsCollector.StatusString() << Endl; + } + return false; + }; + runtime.SetEventFilter(captureEvents); + + lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", batch); + + { + TVector<THashMap<TString, NYdb::TValue>> result; + lHelper.StartScanRequest("SELECT MAX(timestamp) as a, MIN(timestamp) as b, COUNT(*) as c FROM `/Root/olapStore/olapTable`", true, &result); + UNIT_ASSERT(result.size() == 1); + UNIT_ASSERT(result.front().size() == 3); + UNIT_ASSERT(GetValueResult(result.front(), "c")->GetProto().uint64_value() == 600000); + UNIT_ASSERT(GetValueResult(result.front(), "a")->GetProto().uint64_value() == 599999000000); + UNIT_ASSERT(GetValueResult(result.front(), "b")->GetProto().uint64_value() == 0); + } + const ui32 reduceStepsCount = 1; + for (ui32 i = 0; i < reduceStepsCount; ++i) { + runtime.AdvanceCurrentTime(TDuration::Seconds(numRecords * (i + 1) / reduceStepsCount + 500000)); + const TInstant start = TInstant::Now(); + const ui64 purposeSize = 800000000.0 * (1 - 1.0 * (i + 1) / reduceStepsCount); + const ui64 purposeRecords = numRecords * (1 - 1.0 * (i + 1) / reduceStepsCount); + const ui64 purposeMinTimestamp = numRecords * 1.0 * (i + 1) / reduceStepsCount * 1000000; + while (bsCollector.GetChannelSize(2) > purposeSize && TInstant::Now() - start < TDuration::Seconds(60)) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + Cerr << bsCollector.GetChannelSize(2) << "/" << purposeSize << Endl; + + TVector<THashMap<TString, NYdb::TValue>> result; + lHelper.StartScanRequest("SELECT MIN(timestamp) as b, COUNT(*) as c FROM `/Root/olapStore/olapTable`", true, &result); + UNIT_ASSERT(result.size() == 1); + UNIT_ASSERT(result.front().size() == 2); + UNIT_ASSERT(GetValueResult(result.front(), "c")->GetProto().uint64_value() == purposeRecords); + if (purposeRecords) { + UNIT_ASSERT(GetValueResult(result.front(), "b")->GetProto().uint64_value() == purposeMinTimestamp); + } + + Y_VERIFY(bsCollector.GetChannelSize(2) <= purposeSize); + } + + { + TVector<THashMap<TString, NYdb::TValue>> result; + lHelper.StartScanRequest("SELECT COUNT(*) FROM `/Root/olapStore/olapTable`", true, &result); + UNIT_ASSERT(result.front().begin()->second.GetProto().uint64_value() == 0); + } + } + } } |