diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-07 19:31:59 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-07 19:31:59 +0300 |
commit | 278a58c5af63dbd7f7a6d8b8d92dc246651242da (patch) | |
tree | 06e7a3c8890910a7a20ea8ca08d3ee9c28dc6296 | |
parent | af2644d6ed63fab5358f2c04b83b9ea6e99cde35 (diff) | |
download | ydb-278a58c5af63dbd7f7a6d8b8d92dc246651242da.tar.gz |
clean tiers on drop table
37 files changed, 789 insertions, 219 deletions
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index 23379eef9aa..40373be49b6 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -123,65 +123,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) { }); } - std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { - std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); - - arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool()); - arrow::StringBuilder b2; - arrow::StringBuilder b3; - arrow::Int32Builder b4; - arrow::StringBuilder b5; - - for (size_t i = 0; i < rowCount; ++i) { - std::string uid("uid_" + std::to_string(tsBegin + i)); - std::string message("some prefix " + std::string(1024 + i % 200, 'x')); - Y_VERIFY(b1.Append(tsBegin + i).ok()); - Y_VERIFY(b2.Append(std::to_string(pathIdBegin + i)).ok()); - Y_VERIFY(b3.Append(uid).ok()); - Y_VERIFY(b4.Append(i % 5).ok()); - Y_VERIFY(b5.Append(message).ok()); - } - - std::shared_ptr<arrow::TimestampArray> a1; - std::shared_ptr<arrow::StringArray> a2; - std::shared_ptr<arrow::StringArray> a3; - std::shared_ptr<arrow::Int32Array> a4; - std::shared_ptr<arrow::StringArray> a5; - - Y_VERIFY(b1.Finish(&a1).ok()); - Y_VERIFY(b2.Finish(&a2).ok()); - Y_VERIFY(b3.Finish(&a3).ok()); - Y_VERIFY(b4.Finish(&a4).ok()); - Y_VERIFY(b5.Finish(&a5).ok()); - - return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5 }); - } - - TString TestBlob(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { - auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount); - int64_t size; - auto status = arrow::ipc::GetRecordBatchSize(*batch, &size); - Y_VERIFY(status.ok()); - - TString buf; - buf.resize(size); - auto writer = arrow::Buffer::GetWriter(arrow::MutableBuffer::Wrap(&buf[0], size)); - Y_VERIFY(writer.ok()); - - // UNCOMPRESSED - status = SerializeRecordBatch(*batch, arrow::ipc::IpcWriteOptions::Defaults(), (*writer).get()); - Y_VERIFY(status.ok()); - return buf; - } - void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { + TLocalHelper lHelper(kikimr.GetTestServer()); NYdb::NLongTx::TClient client(kikimr.GetDriver()); NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); auto txId = resBeginTx.GetResult().tx_id(); - TString data = TestBlob(pathIdBegin, tsBegin, rowCount); + TString data = lHelper.TestBlob(pathIdBegin, tsBegin, rowCount); NLongTx::TLongTxWriteResult resWrite = client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); @@ -191,34 +141,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); } - void SendDataViaActorSystem(NActors::TTestActorRuntime* runtime, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { - std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); - TString serializedSchema = NArrow::SerializeSchema(*schema); - Y_VERIFY(serializedSchema); - - auto batch = TestBlob(pathIdBegin, tsBegin, rowCount); - Y_VERIFY(batch); - - Ydb::Table::BulkUpsertRequest request; - request.mutable_arrow_batch_settings()->set_schema(serializedSchema); - request.set_data(batch); - request.set_table(testTable); - - size_t responses = 0; - auto future = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(std::move(request), "", "", runtime->GetActorSystem(0)); - future.Subscribe([&](const NThreading::TFuture<Ydb::Table::BulkUpsertResponse> f) mutable { - ++responses; - UNIT_ASSERT_VALUES_EQUAL(f.GetValueSync().operation().status(), Ydb::StatusIds::SUCCESS); - }); - - TDispatchOptions options; - options.CustomFinalCondition = [&]() { - return responses >= 1; - }; - - runtime->DispatchEvents(options); - } - TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it) { TVector<THashMap<TString, NYdb::TValue>> rows; @@ -1600,7 +1522,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { ui32 insertRows = 0; const ui32 iterationPackSize = 2000; for (ui64 i = 0; i < numIterations; ++i) { - SendDataViaActorSystem(runtime, "/Root/olapStore/olapTable", 0, 1000000 + i * 1000000, iterationPackSize); + TLocalHelper(*server).SendDataViaActorSystem("/Root/olapStore/olapTable", 0, 1000000 + i * 1000000, iterationPackSize); insertRows += iterationPackSize; } @@ -1966,7 +1888,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TLocalHelper(*server).CreateTestOlapTable("selectTable", "selectStore", numShards, numShards); ui32 insertRows = 0; for(ui64 i = 0; i < numIterations; ++i) { - SendDataViaActorSystem(runtime, "/Root/selectStore/selectTable", 0, 1000000 + i*1000000, 2000); + TLocalHelper(*server).SendDataViaActorSystem("/Root/selectStore/selectTable", 0, 1000000 + i*1000000, 2000); insertRows += 2000; } @@ -2055,7 +1977,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TLocalHelper(*server).CreateTestOlapTable("largeOlapTable", "largeOlapStore", numShards, numShards); ui32 insertRows = 0; for(ui64 i = 0; i < numIterations; ++i) { - SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000); + TLocalHelper(*server).SendDataViaActorSystem("/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000); insertRows += 2000; } @@ -2121,7 +2043,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TLocalHelper(*server).CreateTestOlapTable("largeOlapTable", "largeOlapStore", numShards, numShards); ui32 insertRows = 0; for(ui64 i = 0; i < numIterations; ++i) { - SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000); + TLocalHelper(*server).SendDataViaActorSystem("/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000); insertRows += 2000; } @@ -2187,7 +2109,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { ui32 insertRows = 0; const ui32 iterationPackSize = 2000; for (ui64 i = 0; i < numIterations; ++i) { - SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i * 1000000, iterationPackSize); + TLocalHelper(*server).SendDataViaActorSystem("/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i * 1000000, iterationPackSize); insertRows += iterationPackSize; } @@ -2274,7 +2196,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { ui32 insertRows = 0; for(ui64 i = 0; i < numIterations; ++i) { - SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000); + TLocalHelper(*server).SendDataViaActorSystem("/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000); insertRows += 2000; } diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 8cb848bfd07..b2b8c6b9a96 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -474,4 +474,6 @@ enum ETxTypes { TXTYPE_ALTER_BLOB_DEPOT_RESULT = 78 [(TxTypeOpts) = {Name: "TxAlterBlobDepotResult"}]; TXTYPE_DROP_BLOB_DEPOT_RESULT = 79 [(TxTypeOpts) = {Name: "TxDropBlobDepotResult"}]; TXTYPE_BLOB_DEPOT_CONFIG_RESULT = 80 [(TxTypeOpts) = {Name: "TxBlobDepotConfigResult"}]; + + TXTYPE_ADD_BACKGROUND_TASK_RESULT = 81 [(TxTypeOpts) = {Name: "TxAddBackgroundTaskResult"}]; } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index e8ffb753a6b..bdfeab0a086 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -889,6 +889,11 @@ message TS3Settings { optional uint32 ConnectionTimeoutMs = 103; }; +message TTaskCleaner { + optional uint64 PathId = 1; + optional TS3Settings StorageSettings = 2; +} + message TBackupTask { optional string TableName = 1; optional uint64 TableId = 2; diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index c01581a8bb6..f8aa3decadc 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -926,5 +926,7 @@ message TActivity { BLOB_DEPOT_BLOCKS_PROCESSOR_ACTOR = 581; TEST_SHARD_LOAD_ACTOR = 582; TEST_SHARD_VALIDATION_ACTOR = 583; + TX_TIERING_TIER_CLEANER = 584; + TX_TIERING_PATH_CLEANER = 585; }; }; diff --git a/ydb/core/testlib/common_helper.cpp b/ydb/core/testlib/common_helper.cpp index 2935060e0d7..45408b456cc 100644 --- a/ydb/core/testlib/common_helper.cpp +++ b/ydb/core/testlib/common_helper.cpp @@ -1,6 +1,12 @@ #include "cs_helper.h" +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +#include <library/cpp/testing/unittest/registar.h> + namespace NKikimr::Tests::NCommon { void THelper::WaitForSchemeOperation(TActorId sender, ui64 txId) { @@ -13,4 +19,34 @@ void THelper::WaitForSchemeOperation(TActorId sender, ui64 txId) { runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(sender); } +void THelper::StartDataRequest(const TString& request) const { + NYdb::NTable::TTableClient tClient(Server.GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([request](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto session = f.GetValueSync().GetSession(); + session.ExecuteDataQuery(request + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); +} + +void THelper::DropTable(const TString& tablePath) { + auto* runtime = Server.GetRuntime(); + Ydb::Table::DropTableRequest request; + request.set_path(tablePath); + size_t responses = 0; + using TEvDropTableRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::DropTableRequest, + Ydb::Table::DropTableResponse>; + auto future = NRpcService::DoLocalRpc<TEvDropTableRequest>(std::move(request), "", "", runtime->GetActorSystem(0)); + future.Subscribe([&](const NThreading::TFuture<Ydb::Table::DropTableResponse> f) mutable { + ++responses; + UNIT_ASSERT_VALUES_EQUAL(f.GetValueSync().operation().status(), Ydb::StatusIds::SUCCESS); + }); + + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return responses >= 1; + }; + + runtime->DispatchEvents(options); +} + } diff --git a/ydb/core/testlib/common_helper.h b/ydb/core/testlib/common_helper.h index 5870a770c6f..25ffdf809bc 100644 --- a/ydb/core/testlib/common_helper.h +++ b/ydb/core/testlib/common_helper.h @@ -13,5 +13,9 @@ public: : Server(server) { } + + void DropTable(const TString& tablePath); + + void StartDataRequest(const TString& request) const; }; } diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index 6441df5be9e..98064014db6 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -1,7 +1,15 @@ #include "cs_helper.h" +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/formats/arrow_helpers.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> + #include <library/cpp/actors/core/event.h> #include <library/cpp/testing/unittest/registar.h> -#include <ydb/core/tx/tx_proxy/proxy.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_binary.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type_fwd.h> namespace NKikimr::Tests::NCS { @@ -38,4 +46,84 @@ void THelper::CreateTestOlapTable(TActorId sender, TString storeName, TString sc WaitForSchemeOperation(sender, txId); } +std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() { + return std::make_shared<arrow::Schema>( + std::vector<std::shared_ptr<arrow::Field>>{ + arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), + arrow::field("resource_id", arrow::utf8()), + arrow::field("uid", arrow::utf8()), + arrow::field("level", arrow::int32()), + arrow::field("message", arrow::utf8()) + }); +} + +TString THelper::TestBlob(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { + auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount); + return NArrow::SerializeBatchNoCompression(batch); +} + +void THelper::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { + auto* runtime = Server.GetRuntime(); + std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); + TString serializedSchema = NArrow::SerializeSchema(*schema); + Y_VERIFY(serializedSchema); + auto batch = TestBlob(pathIdBegin, tsBegin, rowCount); + Y_VERIFY(batch); + + Ydb::Table::BulkUpsertRequest request; + request.mutable_arrow_batch_settings()->set_schema(serializedSchema); + request.set_data(batch); + request.set_table(testTable); + + size_t responses = 0; + using TEvBulkUpsertRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest, + Ydb::Table::BulkUpsertResponse>; + auto future = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(std::move(request), "", "", runtime->GetActorSystem(0)); + future.Subscribe([&](const NThreading::TFuture<Ydb::Table::BulkUpsertResponse> f) mutable { + ++responses; + UNIT_ASSERT_VALUES_EQUAL(f.GetValueSync().operation().status(), Ydb::StatusIds::SUCCESS); + }); + + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return responses >= 1; + }; + + runtime->DispatchEvents(options); +} + +std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { + std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); + + arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool()); + arrow::StringBuilder b2; + arrow::StringBuilder b3; + arrow::Int32Builder b4; + arrow::StringBuilder b5; + + for (size_t i = 0; i < rowCount; ++i) { + std::string uid("uid_" + std::to_string(tsBegin + i)); + std::string message("some prefix " + std::string(1024 + i % 200, 'x')); + Y_VERIFY(b1.Append(tsBegin + i).ok()); + Y_VERIFY(b2.Append(std::to_string(pathIdBegin + i)).ok()); + Y_VERIFY(b3.Append(uid).ok()); + Y_VERIFY(b4.Append(i % 5).ok()); + Y_VERIFY(b5.Append(message).ok()); + } + + std::shared_ptr<arrow::TimestampArray> a1; + std::shared_ptr<arrow::StringArray> a2; + std::shared_ptr<arrow::StringArray> a3; + std::shared_ptr<arrow::Int32Array> a4; + std::shared_ptr<arrow::StringArray> a5; + + Y_VERIFY(b1.Finish(&a1).ok()); + Y_VERIFY(b2.Finish(&a2).ok()); + Y_VERIFY(b3.Finish(&a3).ok()); + Y_VERIFY(b4.Finish(&a4).ok()); + Y_VERIFY(b5.Finish(&a5).ok()); + + return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5 }); +} + } diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h index 3c0123ea2b2..74efd67ed1c 100644 --- a/ydb/core/testlib/cs_helper.h +++ b/ydb/core/testlib/cs_helper.h @@ -1,15 +1,23 @@ #pragma once #include "common_helper.h" +#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> namespace NKikimr::Tests::NCS { class THelper: public NCommon::THelper { private: using TBase = NCommon::THelper; + + std::shared_ptr<arrow::Schema> GetArrowSchema(); + std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount); + public: using TBase::TBase; void CreateTestOlapStore(TActorId sender, TString scheme); void CreateTestOlapTable(TActorId sender, TString storeName, TString scheme); + TString TestBlob(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount); + void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount); }; } diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt index 14d9ad8173b..ac703050476 100644 --- a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt +++ b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt @@ -40,7 +40,6 @@ target_sources(ydb-core-tx-columnshard-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_schema.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp ) add_test( NAME diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/ut/CMakeLists.linux-aarch64.txt index 7e642963ed5..a2b5d071235 100644 --- a/ydb/core/tx/columnshard/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/ut/CMakeLists.linux-aarch64.txt @@ -42,7 +42,6 @@ target_sources(ydb-core-tx-columnshard-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_schema.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp ) add_test( NAME diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt index 1b40ae04947..4de436d1222 100644 --- a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt +++ b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt @@ -44,7 +44,6 @@ target_sources(ydb-core-tx-columnshard-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_schema.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp ) add_test( NAME diff --git a/ydb/core/tx/schemeshard/CMakeLists.txt b/ydb/core/tx/schemeshard/CMakeLists.txt index eba7306656c..d227762db86 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.txt @@ -89,6 +89,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC ydb-library-login library-login-protos library-yql-minikql + ydb-services-bg_tasks ) target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp index bdb673602af..46cde5886d0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp @@ -3,6 +3,7 @@ #include "schemeshard_impl.h" #include <ydb/core/base/subdomain.h> +#include <ydb/core/tx/tiering/cleaner_task.h> namespace NKikimr { namespace NSchemeShard { @@ -262,23 +263,7 @@ private: << " operationId#" << OperationId; } -public: - TProposedDeleteParts(TOperationId id) - : OperationId(id) - { - IgnoreMessages(DebugHint(), - {TEvColumnShard::TEvProposeTransactionResult::EventType, - TEvColumnShard::TEvNotifyTxCompletionResult::EventType, - TEvPrivate::TEvOperationPlan::EventType}); - } - - bool ProgressState(TOperationContext& context) override { - TTabletId ssId = context.SS->SelfTabletId(); - - LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " ProgressState" - << ", at schemeshard: " << ssId); - + bool Finish(TOperationContext& context) { TTxState* txState = context.SS->FindTx(OperationId); Y_VERIFY(txState); Y_VERIFY(txState->TxType == TTxState::TxDropColumnTable); @@ -303,6 +288,40 @@ public: context.OnComplete.DoneOperation(OperationId); return true; } +public: + TProposedDeleteParts(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), + {TEvColumnShard::TEvProposeTransactionResult::EventType, + TEvColumnShard::TEvNotifyTxCompletionResult::EventType, + TEvPrivate::TEvOperationPlan::EventType}); + } + + bool HandleReply(NBackgroundTasks::TEvAddTaskResult::TPtr& ev, TOperationContext& context) override { + Y_VERIFY(ev->Get()->IsSuccess()); + return Finish(context); + } + + bool ProgressState(TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + TTxState* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); + Y_VERIFY(txState->TxType == TTxState::TxDropColumnTable); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", at schemeshard: " << ssId); + + if (NBackgroundTasks::TServiceOperator::IsEnabled()) { + NBackgroundTasks::TTask task(std::make_shared<NColumnShard::NTiers::TTaskCleanerActivity>(txState->TargetPathId.LocalPathId), nullptr); + task.SetId(OperationId.SerializeToString()); + context.SS->SelfId().Send(NBackgroundTasks::MakeServiceId(context.SS->SelfId().NodeId()), new NBackgroundTasks::TEvAddTask(std::move(task))); + return false; + } else { + return Finish(context); + } + } }; class TDropColumnTable : public TSubOperation { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp index 1e769353310..e30f7511a83 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp @@ -7,11 +7,18 @@ namespace NSchemeShard { template<class T> struct TDebugEvent { - static TString ToString (const typename T::TPtr & ev) { + static TString ToString(const typename T::TPtr& ev) { return ev->Get()->Record.ShortDebugString(); } }; +template<> +struct TDebugEvent<NBackgroundTasks::TEvAddTaskResult> { + static TString ToString(const NBackgroundTasks::TEvAddTaskResult::TPtr& ev) { + return ev->Get()->GetDebugString(); + } +}; + template <> struct TDebugEvent<TEvPrivate::TEvOperationPlan> { static TString ToString (const TEvPrivate::TEvOperationPlan::TPtr& ev) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index cfe934dc32e..543a19aa69d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -21,6 +21,7 @@ #include <ydb/core/blockstore/core/blockstore.h> #include <ydb/core/filestore/core/filestore.h> +#include <ydb/services/bg_tasks/service.h> #include <util/generic/ptr.h> #include <util/generic/set.h> @@ -39,6 +40,7 @@ action(TEvDataShard::TEvSplitPartitioningChangedAck, NSchemeShard::TXTYPE_SPLIT_PARTITIONING_CHANGED_DST_ACK) \ \ action(TEvColumnShard::TEvProposeTransactionResult, NSchemeShard::TXTYPE_COLUMNSHARD_PROPOSE_RESULT) \ + action(NBackgroundTasks::TEvAddTaskResult, NSchemeShard::TXTYPE_ADD_BACKGROUND_TASK_RESULT) \ action(TEvColumnShard::TEvNotifyTxCompletionResult, NSchemeShard::TXTYPE_COLUMNSHARD_NOTIFY_TX_COMPLETION_RESULT) \ \ action(NSequenceShard::TEvSequenceShard::TEvCreateSequenceResult, NSchemeShard::TXTYPE_SEQUENCESHARD_CREATE_SEQUENCE_RESULT) \ diff --git a/ydb/core/tx/schemeshard/schemeshard_identificators.h b/ydb/core/tx/schemeshard/schemeshard_identificators.h index 83472071526..989716527fa 100644 --- a/ydb/core/tx/schemeshard/schemeshard_identificators.h +++ b/ydb/core/tx/schemeshard/schemeshard_identificators.h @@ -75,6 +75,34 @@ public: explicit operator bool() const { return GetTxId() != InvalidTxId && GetSubTxId() != InvalidSubTxId; } + + TString SerializeToString() const { + return "SSO:" + ::ToString(GetTxId().GetValue()) + ":" + ::ToString(GetSubTxId()); + } + + bool DeserializeFromString(const TString& data) { + TStringBuf sb(data.data(), data.size()); + if (!sb.StartsWith("SSO:")) { + return false; + } + sb.Skip(4); + TStringBuf l; + TStringBuf r; + if (!sb.TrySplit(':', l, r)) { + return false; + } + ui64 txId; + TSubTxId subTxId; + if (!TryFromString(l, txId)) { + return false; + } + if (!TryFromString(r, subTxId)) { + return false; + } + first = TTxId(txId); + second = subTxId; + return true; + } }; constexpr TOperationId InvalidOperationId = TOperationId(InvalidTxId, InvalidSubTxId); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index de6e27a2eee..66032109e2b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4056,6 +4056,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { // HFuncTraced(TEvColumnShard::TEvProposeTransactionResult, Handle); + HFuncTraced(NBackgroundTasks::TEvAddTaskResult, Handle); HFuncTraced(TEvColumnShard::TEvNotifyTxCompletionResult, Handle); // sequence shard @@ -5020,6 +5021,38 @@ void TSchemeShard::Handle(TEvBlobDepot::TEvApplyConfigResult::TPtr& ev, const TA } } +void TSchemeShard::Handle(NBackgroundTasks::TEvAddTaskResult::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Handle NBackgroundTasks::TEvAddTaskResult" + << ", at schemeshard: " << TabletID() + << ", message: " << ev->Get()->GetDebugString()); + TOperationId id; + if (!id.DeserializeFromString(ev->Get()->GetTaskId())) { + LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Got NBackgroundTasks::TEvAddTaskResult cannot parse operation id in result" + << ", message: " << ev->Get()->GetDebugString() + << ", at schemeshard: " << TabletID()); + return; + } + if (!Operations.contains(id.GetTxId())) { + LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Got NBackgroundTasks::TEvAddTaskResult for unknown txId, ignore it" + << ", txId: " << id.SerializeToString() + << ", message: " << ev->Get()->GetDebugString() + << ", at schemeshard: " << TabletID()); + return; + } + if (!ev->Get()->IsSuccess()) { + LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Got NBackgroundTasks::TEvAddTaskResult cannot execute" + << ", txId: " << id.SerializeToString() + << ", message: " << ev->Get()->GetDebugString() + << ", at schemeshard: " << TabletID()); + return; + } + Execute(CreateTxOperationReply(id, ev), ctx); +} + void TSchemeShard::Handle(TEvColumnShard::TEvProposeTransactionResult::TPtr &ev, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Handle TEvProposeTransactionResult" diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 859562ecfb1..572cb913bbc 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -51,6 +51,8 @@ #include <ydb/library/login/login.h> +#include <ydb/services/bg_tasks/service.h> + #include <util/generic/ptr.h> namespace NKikimr { @@ -921,7 +923,8 @@ public: void Handle(TEvPrivate::TEvSubscribeToShardDeletion::TPtr &ev, const TActorContext &ctx); void Handle(TEvHive::TEvDeleteOwnerTabletsReply::TPtr &ev, const TActorContext &ctx); void Handle(TEvPersQueue::TEvDropTabletReply::TPtr &ev, const TActorContext &ctx); - void Handle(TEvColumnShard::TEvProposeTransactionResult::TPtr &ev, const TActorContext &ctx); + void Handle(TEvColumnShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx); + void Handle(NBackgroundTasks::TEvAddTaskResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvColumnShard::TEvNotifyTxCompletionResult::TPtr &ev, const TActorContext &ctx); void Handle(NSequenceShard::TEvSequenceShard::TEvCreateSequenceResult::TPtr &ev, const TActorContext &ctx); void Handle(NSequenceShard::TEvSequenceShard::TEvDropSequenceResult::TPtr &ev, const TActorContext &ctx); diff --git a/ydb/core/tx/tiering/CMakeLists.txt b/ydb/core/tx/tiering/CMakeLists.txt index 48b8969a74c..2a598bd407c 100644 --- a/ydb/core/tx/tiering/CMakeLists.txt +++ b/ydb/core/tx/tiering/CMakeLists.txt @@ -18,9 +18,13 @@ target_link_libraries(core-tx-tiering PUBLIC core-tablet_flat-protos ydb-core-wrappers api-protos + services-bg_tasks-abstract ydb-services-metadata ) target_sources(core-tx-tiering PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/common.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier_cleaner.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/path_cleaner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/decoder.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/external_data.cpp @@ -28,3 +32,21 @@ target_sources(core-tx-tiering PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/s3_actor.cpp ) + +add_global_library_for(core-tx-tiering.global core-tx-tiering) +target_link_libraries(core-tx-tiering.global PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-json-writer + ydb-core-blobstorage + ydb-core-protos + core-tablet_flat-protos + ydb-core-wrappers + api-protos + services-bg_tasks-abstract + ydb-services-metadata +) +target_sources(core-tx-tiering.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/cleaner_task.cpp +) diff --git a/ydb/core/tx/tiering/cleaner_task.cpp b/ydb/core/tx/tiering/cleaner_task.cpp new file mode 100644 index 00000000000..1872a955349 --- /dev/null +++ b/ydb/core/tx/tiering/cleaner_task.cpp @@ -0,0 +1,25 @@ +#include "cleaner_task.h" +#include "path_cleaner.h" + +namespace NKikimr::NColumnShard::NTiers { + +TTaskCleanerActivity::TFactory::TRegistrator<TTaskCleanerActivity> TTaskCleanerActivity::Registrator(TTaskCleanerActivity::GetClassNameStatic()); + +NKikimrSchemeOp::TTaskCleaner TTaskCleanerActivity::DoSerializeToProto() const { + NKikimrSchemeOp::TTaskCleaner result; + result.SetPathId(PathId); + return result; +} + +bool TTaskCleanerActivity::DoDeserializeFromProto(const NKikimrSchemeOp::TTaskCleaner& protoData) { + PathId = protoData.GetPathId(); + return true; +} + +void TTaskCleanerActivity::DoExecute(NBackgroundTasks::ITaskExecutorController::TPtr controller, + const NBackgroundTasks::TTaskStateContainer& /*state*/) +{ + TActivationContext::AsActorContext().Register(new TPathCleaner(PathId, controller)); +} + +} diff --git a/ydb/core/tx/tiering/cleaner_task.h b/ydb/core/tx/tiering/cleaner_task.h new file mode 100644 index 00000000000..d036fc8aebf --- /dev/null +++ b/ydb/core/tx/tiering/cleaner_task.h @@ -0,0 +1,37 @@ +#pragma once +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/services/bg_tasks/abstract/activity.h> +#include <ydb/services/bg_tasks/abstract/interface.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TTaskCleanerActivity: public NBackgroundTasks::IProtoStringSerializable<NKikimrSchemeOp::TTaskCleaner, NBackgroundTasks::ITaskActivity> { +private: + YDB_READONLY(ui64, PathId, 0); + static TFactory::TRegistrator<TTaskCleanerActivity> Registrator; +protected: + virtual NKikimrSchemeOp::TTaskCleaner DoSerializeToProto() const override; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TTaskCleaner& protoData) override; + + virtual void DoExecute(NBackgroundTasks::ITaskExecutorController::TPtr controller, + const NBackgroundTasks::TTaskStateContainer& /*state*/) override; +public: + TTaskCleanerActivity() = default; + + TTaskCleanerActivity(const ui64 pathId) + : PathId(pathId) + { + + } + + static TString GetClassNameStatic() { + return "sso_path_tiers_cleaner"; + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} diff --git a/ydb/core/tx/tiering/common.cpp b/ydb/core/tx/tiering/common.cpp new file mode 100644 index 00000000000..3c7605000d9 --- /dev/null +++ b/ydb/core/tx/tiering/common.cpp @@ -0,0 +1,5 @@ +#include "common.h" + +namespace NKikimr::NColumnShard::NTiers { + +} diff --git a/ydb/core/tx/tiering/common.h b/ydb/core/tx/tiering/common.h new file mode 100644 index 00000000000..a891f6acf8d --- /dev/null +++ b/ydb/core/tx/tiering/common.h @@ -0,0 +1,14 @@ +#pragma once +#include <ydb/core/base/events.h> + +#include <library/cpp/actors/core/events.h> + +namespace NKikimr::NColumnShard::NTiers { + +enum EEvents { + EvTierCleared = EventSpaceBegin(TKikimrEvents::ES_TIERING), + EvEnd +}; + +static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_TIERING), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TIERING)"); +} diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp index ebabf12bcb3..eab129ed7b1 100644 --- a/ydb/core/tx/tiering/external_data.cpp +++ b/ydb/core/tx/tiering/external_data.cpp @@ -17,7 +17,7 @@ bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQuery for (auto&& r : rawData.rows()) { TTierConfig config; if (!config.DeserializeFromRecord(decoder, r)) { - ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse tier config from snapshot"; + ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot parse tier config from snapshot"; continue; } TierConfigs.emplace(config.GetConfigId(), config); @@ -31,7 +31,7 @@ bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQuery for (auto&& r : rawData.rows()) { TTieringRule tr; if (!tr.DeserializeFromRecord(decoder, r)) { - ALS_WARN(NKikimrServices::TX_COLUMNSHARD) << "cannot parse record for tiering info"; + ALS_WARN(NKikimrServices::TX_TIERING) << "cannot parse record for tiering info"; continue; } rulesLocal.emplace_back(std::move(tr)); @@ -69,6 +69,25 @@ const TTableTiering* TConfigsSnapshot::GetTableTiering(const TString& tablePath) } } +std::vector<NKikimr::NColumnShard::NTiers::TTierConfig> TConfigsSnapshot::GetTiersForPathId(const ui64 pathId) const { + std::vector<TTierConfig> result; + std::set<TString> readyIds; + for (auto&& i : TableTierings) { + for (auto&& r : i.second.GetRules()) { + if (r.GetTablePathId() == pathId) { + auto it = TierConfigs.find(r.GetTierId()); + if (it == TierConfigs.end()) { + ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency tiering for " << r.GetTierId(); + continue; + } else if (readyIds.emplace(r.GetTierId()).second) { + result.emplace_back(it->second); + } + } + } + } + return result; +} + TVector<NMetadataProvider::ITableModifier::TPtr> TSnapshotConstructor::DoGetTableSchema() const { TVector<NMetadataProvider::ITableModifier::TPtr> result; { diff --git a/ydb/core/tx/tiering/external_data.h b/ydb/core/tx/tiering/external_data.h index bb03ed8c9f4..3a01c87c482 100644 --- a/ydb/core/tx/tiering/external_data.h +++ b/ydb/core/tx/tiering/external_data.h @@ -21,6 +21,7 @@ protected: virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override; virtual TString DoSerializeToString() const override; public: + std::vector<TTierConfig> GetTiersForPathId(const ui64 pathId) const; const TTableTiering* GetTableTiering(const TString& tablePath) const; void RemapTablePathToId(const TString& path, const ui64 pathId); std::optional<TTierConfig> GetValue(const TString& key) const; diff --git a/ydb/core/tx/tiering/path_cleaner.cpp b/ydb/core/tx/tiering/path_cleaner.cpp new file mode 100644 index 00000000000..5e6d54ad488 --- /dev/null +++ b/ydb/core/tx/tiering/path_cleaner.cpp @@ -0,0 +1,55 @@ +#include "path_cleaner.h" +#include "external_data.h" + +#include <ydb/services/metadata/service.h> + +namespace NKikimr::NColumnShard::NTiers { + +void TPathCleaner::Handle(TEvTierCleared::TPtr& ev) { + TiersWait.erase(ev->Get()->GetTierName()); + Truncated |= ev->Get()->GetTruncated(); + if (TiersWait.empty()) { + if (Truncated) { + Controller->TaskInterrupted(nullptr); + } else { + Controller->TaskFinished(); + } + } +} + +NMetadataProvider::ISnapshotParser::TPtr TPathCleaner::GetTieringSnapshotParser() const { + if (!ExternalDataManipulation) { + ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(); + auto edmPtr = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation); + edmPtr->Start(edmPtr); + } + return ExternalDataManipulation; +} + +void TPathCleaner::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { + auto configsSnapshot = ev->Get()->GetSnapshotAs<TConfigsSnapshot>(); + Y_VERIFY(configsSnapshot); + Send(NMetadataProvider::MakeServiceId(SelfId().NodeId()), new NMetadataProvider::TEvUnsubscribeExternal(GetTieringSnapshotParser())); + std::vector<TTierConfig> configs = configsSnapshot->GetTiersForPathId(PathId); + for (auto&& i : configs) { + auto config = NWrappers::IExternalStorageConfig::Construct(i.GetProtoConfig().GetObjectStorage()); + if (!config) { + ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot construct storage config for " << i.GetTierName(); + continue; + } + Register(new TTierCleaner(i.GetTierName(), SelfId(), PathId, config)); + TiersWait.emplace(i.GetTierName()); + } +} + +void TPathCleaner::Bootstrap() { + Become(&TPathCleaner::StateMain); + Send(NMetadataProvider::MakeServiceId(SelfId().NodeId()), new NMetadataProvider::TEvSubscribeExternal(GetTieringSnapshotParser())); +} + +TPathCleaner::TPathCleaner(const ui64 pathId, NBackgroundTasks::ITaskExecutorController::TPtr controller) + : PathId(pathId) + , Controller(controller) { +} + +} diff --git a/ydb/core/tx/tiering/path_cleaner.h b/ydb/core/tx/tiering/path_cleaner.h new file mode 100644 index 00000000000..44963a3e986 --- /dev/null +++ b/ydb/core/tx/tiering/path_cleaner.h @@ -0,0 +1,44 @@ +#pragma once +#include "common.h" +#include "tier_cleaner.h" + +#include <ydb/core/wrappers/abstract.h> +#include <ydb/core/wrappers/events/list_objects.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/services/bg_tasks/abstract/activity.h> +#include <ydb/services/metadata/abstract/common.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TPathCleaner: public TActorBootstrapped<TPathCleaner> { +private: + YDB_READONLY(ui64, PathId, 0); + bool Truncated = false; + std::set<TString> TiersWait; + NBackgroundTasks::ITaskExecutorController::TPtr Controller; + mutable NMetadataProvider::ISnapshotParser::TPtr ExternalDataManipulation; + NMetadataProvider::ISnapshotParser::TPtr GetTieringSnapshotParser() const; +protected: + void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev); + void Handle(TEvTierCleared::TPtr& ev); +public: + TPathCleaner(const ui64 pathId, NBackgroundTasks::ITaskExecutorController::TPtr controller); + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::TX_TIERING_PATH_CLEANER; + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); + hFunc(TEvTierCleared, Handle); + default: + break; + } + } + + void Bootstrap(); +}; +} diff --git a/ydb/core/tx/tiering/rule.h b/ydb/core/tx/tiering/rule.h index 4481f500925..1187efc8ff1 100644 --- a/ydb/core/tx/tiering/rule.h +++ b/ydb/core/tx/tiering/rule.h @@ -89,7 +89,7 @@ public: if (Rules.size()) { Y_VERIFY(Rules.back().GetDurationForEvict() <= tr.GetDurationForEvict()); if (Column != tr.GetColumn()) { - ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "inconsistency rule column: " << + ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency rule column: " << TablePath << "/" << Column << " != " << tr.GetColumn(); return; } diff --git a/ydb/core/tx/tiering/tier_cleaner.cpp b/ydb/core/tx/tiering/tier_cleaner.cpp new file mode 100644 index 00000000000..e7b8f94d850 --- /dev/null +++ b/ydb/core/tx/tiering/tier_cleaner.cpp @@ -0,0 +1,58 @@ +#include "tier_cleaner.h" + +namespace NKikimr::NColumnShard::NTiers { + +void TTierCleaner::Handle(NWrappers::NExternalStorage::TEvListObjectsResponse::TPtr& ev) { + Truncated = ev->Get()->GetResult().GetIsTruncated(); + auto request = Aws::S3::Model::DeleteObjectsRequest(); + + Aws::S3::Model::Delete deleteInfo; + for (auto&& i : ev->Get()->GetResult().GetContents()) { + Aws::S3::Model::ObjectIdentifier id; + id.WithKey(i.GetKey()); + deleteInfo.AddObjects(std::move(id)); + } + if (ev->Get()->GetResult().GetContents().empty()) { + Send(OwnerId, new TEvTierCleared(TierName, false)); + } else { + request.WithDelete(deleteInfo); + Send(SelfId(), new NWrappers::TEvExternalStorage::TEvDeleteObjectsRequest(request)); + } +} + +void TTierCleaner::Handle(NWrappers::NExternalStorage::TEvDeleteObjectsRequest::TPtr& ev) { + Storage->Execute(ev); +} + +void TTierCleaner::Handle(NWrappers::NExternalStorage::TEvDeleteObjectsResponse::TPtr& ev) { + if (!ev->Get()->IsSuccess()) { + ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot remove objects pack: " << ev->Get()->GetResult(); + Send(OwnerId, new TEvTierCleared(TierName, true)); + } else { + Send(OwnerId, new TEvTierCleared(TierName, Truncated)); + } +} + +void TTierCleaner::Handle(NWrappers::NExternalStorage::TEvListObjectsRequest::TPtr& ev) { + Storage->Execute(ev); +} + +void TTierCleaner::Bootstrap() { + Become(&TTierCleaner::StateMain); + + auto request = Aws::S3::Model::ListObjectsRequest() + .WithPrefix("S3-" + ::ToString(PathId)); + + Send(SelfId(), new NWrappers::TEvExternalStorage::TEvListObjectsRequest(request)); +} + +TTierCleaner::TTierCleaner(const TString& tierName, const TActorId& ownerId, + const ui64 pathId, NWrappers::IExternalStorageConfig::TPtr storageConfig) + : TierName(tierName) + , OwnerId(ownerId) + , PathId(pathId) + , StorageConfig(storageConfig) { + Storage = StorageConfig->ConstructStorageOperator(); + Y_VERIFY(Storage); +} +} diff --git a/ydb/core/tx/tiering/tier_cleaner.h b/ydb/core/tx/tiering/tier_cleaner.h new file mode 100644 index 00000000000..9c8d421ad08 --- /dev/null +++ b/ydb/core/tx/tiering/tier_cleaner.h @@ -0,0 +1,61 @@ +#pragma once +#include "common.h" + +#include <ydb/core/wrappers/abstract.h> +#include <ydb/core/wrappers/events/list_objects.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/services/bg_tasks/abstract/activity.h> +#include <ydb/services/metadata/abstract/common.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TEvTierCleared: public TEventLocal<TEvTierCleared, EEvents::EvTierCleared> { +private: + YDB_READONLY_DEF(TString, TierName); + YDB_READONLY(bool, Truncated, false); +public: + TEvTierCleared(const TString& tierName, const bool truncated) + : TierName(tierName) + , Truncated(truncated) + { + + } +}; + +class TTierCleaner: public TActorBootstrapped<TTierCleaner> { +private: + const TString TierName; + const TActorId OwnerId; + const ui64 PathId = 0; + NWrappers::NExternalStorage::IExternalStorageConfig::TPtr StorageConfig; + NWrappers::NExternalStorage::IExternalStorageOperator::TPtr Storage; + bool Truncated = true; +protected: + void Handle(NWrappers::NExternalStorage::TEvListObjectsResponse::TPtr& ev); + void Handle(NWrappers::NExternalStorage::TEvDeleteObjectsResponse::TPtr& ev); + void Handle(NWrappers::NExternalStorage::TEvListObjectsRequest::TPtr& ev); + void Handle(NWrappers::NExternalStorage::TEvDeleteObjectsRequest::TPtr& ev); +public: + TTierCleaner(const TString& tierName, const TActorId& ownerId, const ui64 pathId, + NWrappers::IExternalStorageConfig::TPtr storageConfig); + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::TX_TIERING_TIER_CLEANER; + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(NWrappers::NExternalStorage::TEvListObjectsResponse, Handle); + hFunc(NWrappers::NExternalStorage::TEvDeleteObjectsResponse, Handle); + hFunc(NWrappers::NExternalStorage::TEvListObjectsRequest, Handle); + hFunc(NWrappers::NExternalStorage::TEvDeleteObjectsRequest, Handle); + default: + break; + } + } + + void Bootstrap(); +}; +} diff --git a/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index 851166f30d4..3b1caea1d88 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -1,4 +1,3 @@ -#include "columnshard_ut_common.h" #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/testlib/cs_helper.h> #include <ydb/core/tx/tiering/external_data.h> @@ -6,11 +5,14 @@ #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/wrappers/ut_helpers/s3_mock.h> #include <ydb/core/wrappers/s3_wrapper.h> +#include <ydb/core/wrappers/fake_storage.h> +#include <ydb/library/accessor/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/services/metadata/service.h> #include <library/cpp/actors/core/av_bootstrapped.h> #include <library/cpp/protobuf/json/proto2json.h> +#include <library/cpp/testing/unittest/registar.h> #include <util/system/hostname.h> @@ -135,7 +137,9 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { for (const TInstant start = Now(); !IsFound() && Now() - start < TDuration::Seconds(10); ) { runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); + runtime.UpdateCurrentTime(Now()); } + runtime.SetObserverFunc(TTestActorRuntime::DefaultObserverFunc); Y_VERIFY(IsFound()); } @@ -225,46 +229,19 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { TTestCSEmulator* emulator = new TTestCSEmulator; emulator->MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc")); runtime.Register(emulator); - { - const TInstant start = Now(); - while (Now() - start < TDuration::Seconds(10)) { - runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { - Cerr << "Step " << event->Type << Endl; - return false; - }, {}, TDuration::Seconds(1)); - Sleep(TDuration::Seconds(1)); - Cerr << "Step finished" << Endl; - } - } + runtime.SimulateSleep(TDuration::Seconds(10)); + Cerr << "Initialization finished" << Endl; - { - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto session = f.GetValueSync().GetSession(); - session.ExecuteDataQuery( - "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " - "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - } - { - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto session = f.GetValueSync().GetSession(); - session.ExecuteDataQuery( - "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " - "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - } + lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"); + lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"); const TInstant start = Now(); while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { - runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10)); + runtime.SimulateSleep(TDuration::Seconds(1)); } Y_VERIFY(emulator->IsFound()); } - //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); - //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); } Y_UNIT_TEST(DSConfigs) { @@ -295,36 +272,14 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE); runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_INFO); -// runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG); - for (const TInstant start = Now(); Now() - start < TDuration::Seconds(10); ) { - runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { - Cerr << "Step " << event->Type << Endl; - return false; - }, {}, TDuration::Seconds(1)); - Sleep(TDuration::Seconds(1)); - Cerr << "Step finished" << Endl; - } - - { - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto session = f.GetValueSync().GetSession(); - session.ExecuteDataQuery( - "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " - "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - } - { - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto session = f.GetValueSync().GetSession(); - session.ExecuteDataQuery( - "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " - "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - } + // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG); + runtime.SimulateSleep(TDuration::Seconds(10)); + Cerr << "Initialization finished" << Endl; + + lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"); + lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"); { TTestCSEmulator emulator; emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc")); @@ -332,27 +287,10 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { emulator.CheckRuntime(runtime); } - { - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto session = f.GetValueSync().GetSession(); - session.ExecuteDataQuery( - "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " - "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr + "')" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - } - { - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto session = f.GetValueSync().GetSession(); - session.ExecuteDataQuery( - "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " - "VALUES ('/Root/olapStore', 'tier2', '/Root/olapStore/olapTable', 'timestamp', '20d')" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - } - + lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr + "')"); + lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + "VALUES ('/Root/olapStore', 'tier2', '/Root/olapStore/olapTable', 'timestamp', '20d')"); { TTestCSEmulator emulator; emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc")); @@ -361,24 +299,8 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { emulator.CheckRuntime(runtime); } - { - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto session = f.GetValueSync().GetSession(); - session.ExecuteDataQuery( - "DELETE FROM `/Root/.external_data/tiers`" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - } - { - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto session = f.GetValueSync().GetSession(); - session.ExecuteDataQuery( - "DELETE FROM `/Root/.external_data/tiering`" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - } + lHelper.StartDataRequest("DELETE FROM `/Root/.external_data/tiers`"); + lHelper.StartDataRequest("DELETE FROM `/Root/.external_data/tiering`"); { TTestCSEmulator emulator; emulator.SetTieringsCount(0); @@ -388,6 +310,137 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); } +//#define S3_TEST_USAGE +#ifdef S3_TEST_USAGE + const TString TierConfigProtoStr = + R"( + Name : "fakeTier" + ObjectStorage : { + Scheme: HTTP + VerifySSL: false + Endpoint: "storage.cloud-preprod.yandex.net" + Bucket: "tiering-test-01" + AccessKey: "..." + SecretKey: "..." + ProxyHost: "localhost" + ProxyPort: 8080 + ProxyScheme: HTTP + } + )"; + const TString TierEndpoint = "storage.cloud-preprod.yandex.net"; +#else + const TString TierConfigProtoStr = + R"( + Name : "fakeTier" + ObjectStorage : { + Endpoint: "fake" + Bucket: "fake" + } + )"; + const TString TierEndpoint = "fake"; +#endif + + Y_UNIT_TEST(TieringUsage) { + TPortManager pm; + + ui32 grpcPort = pm.GetPort(); + ui32 msgbPort = pm.GetPort(); + + Tests::TServerSettings serverSettings(msgbPort); + serverSettings.Port = msgbPort; + serverSettings.GrpcPort = grpcPort; + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMetadataProvider(true) + .SetEnableBackgroundTasks(true) + .SetEnableOlapSchemaOperations(true); + ; + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + server->EnableGRpc(grpcPort); + Tests::TClient client(serverSettings); + + auto& runtime = *server->GetRuntime(); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + TLocalHelper lHelper(*server); + lHelper.CreateTestOlapTable("olapTable"); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE); + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::BG_TASKS, NLog::PRI_DEBUG); + // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG); + Cerr << "Wait initialization" << Endl; + runtime.SimulateSleep(TDuration::Seconds(20)); + Cerr << "Initialization finished" << Endl; + + lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + "VALUES ('/Root/olapStore', 'fakeTier1', '" + TierConfigProtoStr + "')"); + lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + "VALUES ('/Root/olapStore', 'fakeTier2', '" + TierConfigProtoStr + "')"); + lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + "VALUES ('/Root/olapStore', 'fakeTier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"); + lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + "VALUES ('/Root/olapStore', 'fakeTier2', '/Root/olapStore/olapTable', 'timestamp', '20d')"); + { + TTestCSEmulator emulator; + emulator.MutableCheckers().emplace("/Root/olapStore.fakeTier1", TJsonChecker("Name", "fakeTier")); + emulator.MutableCheckers().emplace("/Root/olapStore.fakeTier2", TJsonChecker("ObjectStorage.Endpoint", TierEndpoint)); + emulator.SetTieringsCount(2); + emulator.CheckRuntime(runtime); + } + Cerr << "Insert..." << Endl; + const TInstant pkStart = Now() - TDuration::Days(15); + ui32 idx = 0; + lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", 0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 2000); + { + const TInstant start = Now(); + bool check = false; + while (Now() - start < TDuration::Seconds(60)) { + Cerr << "Waiting..." << Endl; +#ifndef S3_TEST_USAGE + if (Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetSize()) { + check = true; + Cerr << "Fake storage filled" << Endl; + break; + } +#else + check = true; +#endif + runtime.SimulateSleep(TDuration::Seconds(1)); + } + Y_VERIFY(check); + } +#ifdef S3_TEST_USAGE + Cerr << "storage initialized..." << Endl; +#endif + + lHelper.DropTable("/Root/olapStore/olapTable"); + { + const TInstant start = Now(); + bool check = false; + while (Now() - start < TDuration::Seconds(60)) { + Cerr << "Cleaning waiting..." << Endl; +#ifndef S3_TEST_USAGE + if (!Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetSize()) { + check = true; + Cerr << "Fake storage clean" << Endl; + break; + } +#else + check = true; +#endif + runtime.SimulateSleep(TDuration::Seconds(1)); + } + Y_VERIFY(check); + } +#ifndef S3_TEST_USAGE + Y_VERIFY(Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetBucketsCount() == 1); +#endif + } } } diff --git a/ydb/services/bg_tasks/ds_table/executor.h b/ydb/services/bg_tasks/ds_table/executor.h index 1a07c093991..4b84cd589e1 100644 --- a/ydb/services/bg_tasks/ds_table/executor.h +++ b/ydb/services/bg_tasks/ds_table/executor.h @@ -94,6 +94,7 @@ public: : TBase(config.GetRequestConfig()) , Config(config) { + TServiceOperator::Register(); } }; diff --git a/ydb/services/bg_tasks/service.cpp b/ydb/services/bg_tasks/service.cpp index 9c1b0dec5b0..9fbd6fcec2e 100644 --- a/ydb/services/bg_tasks/service.cpp +++ b/ydb/services/bg_tasks/service.cpp @@ -6,4 +6,12 @@ NActors::TActorId MakeServiceId(const ui32 nodeId) { return NActors::TActorId(nodeId, "SrvcBgrdTask"); } +void TServiceOperator::Register() { + Singleton<TServiceOperator>()->EnabledFlag = true; +} + +bool TServiceOperator::IsEnabled() { + return Singleton<TServiceOperator>()->EnabledFlag; +} + } diff --git a/ydb/services/bg_tasks/service.h b/ydb/services/bg_tasks/service.h index 0a022c60a56..308836037fc 100644 --- a/ydb/services/bg_tasks/service.h +++ b/ydb/services/bg_tasks/service.h @@ -70,6 +70,15 @@ public: using TBase::TBase; }; +class TServiceOperator { +private: + friend class TExecutor; + bool EnabledFlag = false; + static void Register(); +public: + static bool IsEnabled(); +}; + NActors::TActorId MakeServiceId(const ui32 nodeId); } diff --git a/ydb/services/metadata/abstract/CMakeLists.txt b/ydb/services/metadata/abstract/CMakeLists.txt index 7ea18da9cfd..e635a4400ea 100644 --- a/ydb/services/metadata/abstract/CMakeLists.txt +++ b/ydb/services/metadata/abstract/CMakeLists.txt @@ -13,6 +13,7 @@ target_link_libraries(services-metadata-abstract PUBLIC yutil ydb-library-accessor cpp-actors-core + services-metadata-request api-protos ydb-core-base ) diff --git a/ydb/services/metadata/service.cpp b/ydb/services/metadata/service.cpp index 736d41bfc56..70714c1c43b 100644 --- a/ydb/services/metadata/service.cpp +++ b/ydb/services/metadata/service.cpp @@ -2,8 +2,8 @@ namespace NKikimr::NMetadataProvider { -NActors::TActorId MakeServiceId(ui32 node) { - return NActors::TActorId(node, "SrvcMetaData"); +NActors::TActorId MakeServiceId(const ui32 nodeId) { + return NActors::TActorId(nodeId, "SrvcMetaData"); } } diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h index 888ec5763b4..858986f779a 100644 --- a/ydb/services/metadata/service.h +++ b/ydb/services/metadata/service.h @@ -27,6 +27,6 @@ public: } }; -NActors::TActorId MakeServiceId(ui32 node = 0); +NActors::TActorId MakeServiceId(const ui32 node); } |