aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-11-07 19:31:59 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-11-07 19:31:59 +0300
commit278a58c5af63dbd7f7a6d8b8d92dc246651242da (patch)
tree06e7a3c8890910a7a20ea8ca08d3ee9c28dc6296
parentaf2644d6ed63fab5358f2c04b83b9ea6e99cde35 (diff)
downloadydb-278a58c5af63dbd7f7a6d8b8d92dc246651242da.tar.gz
clean tiers on drop table
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp94
-rw-r--r--ydb/core/protos/counters_schemeshard.proto2
-rw-r--r--ydb/core/protos/flat_scheme_op.proto5
-rw-r--r--ydb/core/protos/services.proto2
-rw-r--r--ydb/core/testlib/common_helper.cpp36
-rw-r--r--ydb/core/testlib/common_helper.h4
-rw-r--r--ydb/core/testlib/cs_helper.cpp90
-rw-r--r--ydb/core/testlib/cs_helper.h8
-rw-r--r--ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tx/columnshard/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.txt1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp53
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_part.cpp9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_part.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_identificators.h28
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp33
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h5
-rw-r--r--ydb/core/tx/tiering/CMakeLists.txt22
-rw-r--r--ydb/core/tx/tiering/cleaner_task.cpp25
-rw-r--r--ydb/core/tx/tiering/cleaner_task.h37
-rw-r--r--ydb/core/tx/tiering/common.cpp5
-rw-r--r--ydb/core/tx/tiering/common.h14
-rw-r--r--ydb/core/tx/tiering/external_data.cpp23
-rw-r--r--ydb/core/tx/tiering/external_data.h1
-rw-r--r--ydb/core/tx/tiering/path_cleaner.cpp55
-rw-r--r--ydb/core/tx/tiering/path_cleaner.h44
-rw-r--r--ydb/core/tx/tiering/rule.h2
-rw-r--r--ydb/core/tx/tiering/tier_cleaner.cpp58
-rw-r--r--ydb/core/tx/tiering/tier_cleaner.h61
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp (renamed from ydb/core/tx/columnshard/ut_columnshard_tiers.cpp)261
-rw-r--r--ydb/services/bg_tasks/ds_table/executor.h1
-rw-r--r--ydb/services/bg_tasks/service.cpp8
-rw-r--r--ydb/services/bg_tasks/service.h9
-rw-r--r--ydb/services/metadata/abstract/CMakeLists.txt1
-rw-r--r--ydb/services/metadata/service.cpp4
-rw-r--r--ydb/services/metadata/service.h2
37 files changed, 789 insertions, 219 deletions
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index 23379eef9aa..40373be49b6 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -123,65 +123,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
});
}
- std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
- std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
-
- arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool());
- arrow::StringBuilder b2;
- arrow::StringBuilder b3;
- arrow::Int32Builder b4;
- arrow::StringBuilder b5;
-
- for (size_t i = 0; i < rowCount; ++i) {
- std::string uid("uid_" + std::to_string(tsBegin + i));
- std::string message("some prefix " + std::string(1024 + i % 200, 'x'));
- Y_VERIFY(b1.Append(tsBegin + i).ok());
- Y_VERIFY(b2.Append(std::to_string(pathIdBegin + i)).ok());
- Y_VERIFY(b3.Append(uid).ok());
- Y_VERIFY(b4.Append(i % 5).ok());
- Y_VERIFY(b5.Append(message).ok());
- }
-
- std::shared_ptr<arrow::TimestampArray> a1;
- std::shared_ptr<arrow::StringArray> a2;
- std::shared_ptr<arrow::StringArray> a3;
- std::shared_ptr<arrow::Int32Array> a4;
- std::shared_ptr<arrow::StringArray> a5;
-
- Y_VERIFY(b1.Finish(&a1).ok());
- Y_VERIFY(b2.Finish(&a2).ok());
- Y_VERIFY(b3.Finish(&a3).ok());
- Y_VERIFY(b4.Finish(&a4).ok());
- Y_VERIFY(b5.Finish(&a5).ok());
-
- return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5 });
- }
-
- TString TestBlob(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
- auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount);
- int64_t size;
- auto status = arrow::ipc::GetRecordBatchSize(*batch, &size);
- Y_VERIFY(status.ok());
-
- TString buf;
- buf.resize(size);
- auto writer = arrow::Buffer::GetWriter(arrow::MutableBuffer::Wrap(&buf[0], size));
- Y_VERIFY(writer.ok());
-
- // UNCOMPRESSED
- status = SerializeRecordBatch(*batch, arrow::ipc::IpcWriteOptions::Defaults(), (*writer).get());
- Y_VERIFY(status.ok());
- return buf;
- }
-
void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+ TLocalHelper lHelper(kikimr.GetTestServer());
NYdb::NLongTx::TClient client(kikimr.GetDriver());
NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString());
auto txId = resBeginTx.GetResult().tx_id();
- TString data = TestBlob(pathIdBegin, tsBegin, rowCount);
+ TString data = lHelper.TestBlob(pathIdBegin, tsBegin, rowCount);
NLongTx::TLongTxWriteResult resWrite =
client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync();
@@ -191,34 +141,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
}
- void SendDataViaActorSystem(NActors::TTestActorRuntime* runtime, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
- std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
- TString serializedSchema = NArrow::SerializeSchema(*schema);
- Y_VERIFY(serializedSchema);
-
- auto batch = TestBlob(pathIdBegin, tsBegin, rowCount);
- Y_VERIFY(batch);
-
- Ydb::Table::BulkUpsertRequest request;
- request.mutable_arrow_batch_settings()->set_schema(serializedSchema);
- request.set_data(batch);
- request.set_table(testTable);
-
- size_t responses = 0;
- auto future = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(std::move(request), "", "", runtime->GetActorSystem(0));
- future.Subscribe([&](const NThreading::TFuture<Ydb::Table::BulkUpsertResponse> f) mutable {
- ++responses;
- UNIT_ASSERT_VALUES_EQUAL(f.GetValueSync().operation().status(), Ydb::StatusIds::SUCCESS);
- });
-
- TDispatchOptions options;
- options.CustomFinalCondition = [&]() {
- return responses >= 1;
- };
-
- runtime->DispatchEvents(options);
- }
-
TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it) {
TVector<THashMap<TString, NYdb::TValue>> rows;
@@ -1600,7 +1522,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
ui32 insertRows = 0;
const ui32 iterationPackSize = 2000;
for (ui64 i = 0; i < numIterations; ++i) {
- SendDataViaActorSystem(runtime, "/Root/olapStore/olapTable", 0, 1000000 + i * 1000000, iterationPackSize);
+ TLocalHelper(*server).SendDataViaActorSystem("/Root/olapStore/olapTable", 0, 1000000 + i * 1000000, iterationPackSize);
insertRows += iterationPackSize;
}
@@ -1966,7 +1888,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TLocalHelper(*server).CreateTestOlapTable("selectTable", "selectStore", numShards, numShards);
ui32 insertRows = 0;
for(ui64 i = 0; i < numIterations; ++i) {
- SendDataViaActorSystem(runtime, "/Root/selectStore/selectTable", 0, 1000000 + i*1000000, 2000);
+ TLocalHelper(*server).SendDataViaActorSystem("/Root/selectStore/selectTable", 0, 1000000 + i*1000000, 2000);
insertRows += 2000;
}
@@ -2055,7 +1977,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TLocalHelper(*server).CreateTestOlapTable("largeOlapTable", "largeOlapStore", numShards, numShards);
ui32 insertRows = 0;
for(ui64 i = 0; i < numIterations; ++i) {
- SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000);
+ TLocalHelper(*server).SendDataViaActorSystem("/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000);
insertRows += 2000;
}
@@ -2121,7 +2043,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TLocalHelper(*server).CreateTestOlapTable("largeOlapTable", "largeOlapStore", numShards, numShards);
ui32 insertRows = 0;
for(ui64 i = 0; i < numIterations; ++i) {
- SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000);
+ TLocalHelper(*server).SendDataViaActorSystem("/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000);
insertRows += 2000;
}
@@ -2187,7 +2109,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
ui32 insertRows = 0;
const ui32 iterationPackSize = 2000;
for (ui64 i = 0; i < numIterations; ++i) {
- SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i * 1000000, iterationPackSize);
+ TLocalHelper(*server).SendDataViaActorSystem("/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i * 1000000, iterationPackSize);
insertRows += iterationPackSize;
}
@@ -2274,7 +2196,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
ui32 insertRows = 0;
for(ui64 i = 0; i < numIterations; ++i) {
- SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000);
+ TLocalHelper(*server).SendDataViaActorSystem("/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000);
insertRows += 2000;
}
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index 8cb848bfd07..b2b8c6b9a96 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -474,4 +474,6 @@ enum ETxTypes {
TXTYPE_ALTER_BLOB_DEPOT_RESULT = 78 [(TxTypeOpts) = {Name: "TxAlterBlobDepotResult"}];
TXTYPE_DROP_BLOB_DEPOT_RESULT = 79 [(TxTypeOpts) = {Name: "TxDropBlobDepotResult"}];
TXTYPE_BLOB_DEPOT_CONFIG_RESULT = 80 [(TxTypeOpts) = {Name: "TxBlobDepotConfigResult"}];
+
+ TXTYPE_ADD_BACKGROUND_TASK_RESULT = 81 [(TxTypeOpts) = {Name: "TxAddBackgroundTaskResult"}];
}
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index e8ffb753a6b..bdfeab0a086 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -889,6 +889,11 @@ message TS3Settings {
optional uint32 ConnectionTimeoutMs = 103;
};
+message TTaskCleaner {
+ optional uint64 PathId = 1;
+ optional TS3Settings StorageSettings = 2;
+}
+
message TBackupTask {
optional string TableName = 1;
optional uint64 TableId = 2;
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index c01581a8bb6..f8aa3decadc 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -926,5 +926,7 @@ message TActivity {
BLOB_DEPOT_BLOCKS_PROCESSOR_ACTOR = 581;
TEST_SHARD_LOAD_ACTOR = 582;
TEST_SHARD_VALIDATION_ACTOR = 583;
+ TX_TIERING_TIER_CLEANER = 584;
+ TX_TIERING_PATH_CLEANER = 585;
};
};
diff --git a/ydb/core/testlib/common_helper.cpp b/ydb/core/testlib/common_helper.cpp
index 2935060e0d7..45408b456cc 100644
--- a/ydb/core/testlib/common_helper.cpp
+++ b/ydb/core/testlib/common_helper.cpp
@@ -1,6 +1,12 @@
#include "cs_helper.h"
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
namespace NKikimr::Tests::NCommon {
void THelper::WaitForSchemeOperation(TActorId sender, ui64 txId) {
@@ -13,4 +19,34 @@ void THelper::WaitForSchemeOperation(TActorId sender, ui64 txId) {
runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(sender);
}
+void THelper::StartDataRequest(const TString& request) const {
+ NYdb::NTable::TTableClient tClient(Server.GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([request](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto session = f.GetValueSync().GetSession();
+ session.ExecuteDataQuery(request
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+}
+
+void THelper::DropTable(const TString& tablePath) {
+ auto* runtime = Server.GetRuntime();
+ Ydb::Table::DropTableRequest request;
+ request.set_path(tablePath);
+ size_t responses = 0;
+ using TEvDropTableRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::DropTableRequest,
+ Ydb::Table::DropTableResponse>;
+ auto future = NRpcService::DoLocalRpc<TEvDropTableRequest>(std::move(request), "", "", runtime->GetActorSystem(0));
+ future.Subscribe([&](const NThreading::TFuture<Ydb::Table::DropTableResponse> f) mutable {
+ ++responses;
+ UNIT_ASSERT_VALUES_EQUAL(f.GetValueSync().operation().status(), Ydb::StatusIds::SUCCESS);
+ });
+
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return responses >= 1;
+ };
+
+ runtime->DispatchEvents(options);
+}
+
}
diff --git a/ydb/core/testlib/common_helper.h b/ydb/core/testlib/common_helper.h
index 5870a770c6f..25ffdf809bc 100644
--- a/ydb/core/testlib/common_helper.h
+++ b/ydb/core/testlib/common_helper.h
@@ -13,5 +13,9 @@ public:
: Server(server) {
}
+
+ void DropTable(const TString& tablePath);
+
+ void StartDataRequest(const TString& request) const;
};
}
diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp
index 6441df5be9e..98064014db6 100644
--- a/ydb/core/testlib/cs_helper.cpp
+++ b/ydb/core/testlib/cs_helper.cpp
@@ -1,7 +1,15 @@
#include "cs_helper.h"
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/formats/arrow_helpers.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+
#include <library/cpp/actors/core/event.h>
#include <library/cpp/testing/unittest/registar.h>
-#include <ydb/core/tx/tx_proxy/proxy.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/buffer.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_binary.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type_fwd.h>
namespace NKikimr::Tests::NCS {
@@ -38,4 +46,84 @@ void THelper::CreateTestOlapTable(TActorId sender, TString storeName, TString sc
WaitForSchemeOperation(sender, txId);
}
+std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() {
+ return std::make_shared<arrow::Schema>(
+ std::vector<std::shared_ptr<arrow::Field>>{
+ arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)),
+ arrow::field("resource_id", arrow::utf8()),
+ arrow::field("uid", arrow::utf8()),
+ arrow::field("level", arrow::int32()),
+ arrow::field("message", arrow::utf8())
+ });
+}
+
+TString THelper::TestBlob(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+ auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount);
+ return NArrow::SerializeBatchNoCompression(batch);
+}
+
+void THelper::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+ auto* runtime = Server.GetRuntime();
+ std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
+ TString serializedSchema = NArrow::SerializeSchema(*schema);
+ Y_VERIFY(serializedSchema);
+ auto batch = TestBlob(pathIdBegin, tsBegin, rowCount);
+ Y_VERIFY(batch);
+
+ Ydb::Table::BulkUpsertRequest request;
+ request.mutable_arrow_batch_settings()->set_schema(serializedSchema);
+ request.set_data(batch);
+ request.set_table(testTable);
+
+ size_t responses = 0;
+ using TEvBulkUpsertRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest,
+ Ydb::Table::BulkUpsertResponse>;
+ auto future = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(std::move(request), "", "", runtime->GetActorSystem(0));
+ future.Subscribe([&](const NThreading::TFuture<Ydb::Table::BulkUpsertResponse> f) mutable {
+ ++responses;
+ UNIT_ASSERT_VALUES_EQUAL(f.GetValueSync().operation().status(), Ydb::StatusIds::SUCCESS);
+ });
+
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return responses >= 1;
+ };
+
+ runtime->DispatchEvents(options);
+}
+
+std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+ std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
+
+ arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool());
+ arrow::StringBuilder b2;
+ arrow::StringBuilder b3;
+ arrow::Int32Builder b4;
+ arrow::StringBuilder b5;
+
+ for (size_t i = 0; i < rowCount; ++i) {
+ std::string uid("uid_" + std::to_string(tsBegin + i));
+ std::string message("some prefix " + std::string(1024 + i % 200, 'x'));
+ Y_VERIFY(b1.Append(tsBegin + i).ok());
+ Y_VERIFY(b2.Append(std::to_string(pathIdBegin + i)).ok());
+ Y_VERIFY(b3.Append(uid).ok());
+ Y_VERIFY(b4.Append(i % 5).ok());
+ Y_VERIFY(b5.Append(message).ok());
+ }
+
+ std::shared_ptr<arrow::TimestampArray> a1;
+ std::shared_ptr<arrow::StringArray> a2;
+ std::shared_ptr<arrow::StringArray> a3;
+ std::shared_ptr<arrow::Int32Array> a4;
+ std::shared_ptr<arrow::StringArray> a5;
+
+ Y_VERIFY(b1.Finish(&a1).ok());
+ Y_VERIFY(b2.Finish(&a2).ok());
+ Y_VERIFY(b3.Finish(&a3).ok());
+ Y_VERIFY(b4.Finish(&a4).ok());
+ Y_VERIFY(b5.Finish(&a5).ok());
+
+ return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5 });
+}
+
}
diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h
index 3c0123ea2b2..74efd67ed1c 100644
--- a/ydb/core/testlib/cs_helper.h
+++ b/ydb/core/testlib/cs_helper.h
@@ -1,15 +1,23 @@
#pragma once
#include "common_helper.h"
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
namespace NKikimr::Tests::NCS {
class THelper: public NCommon::THelper {
private:
using TBase = NCommon::THelper;
+
+ std::shared_ptr<arrow::Schema> GetArrowSchema();
+ std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount);
+
public:
using TBase::TBase;
void CreateTestOlapStore(TActorId sender, TString scheme);
void CreateTestOlapTable(TActorId sender, TString storeName, TString scheme);
+ TString TestBlob(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount);
+ void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount);
};
}
diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt
index 14d9ad8173b..ac703050476 100644
--- a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt
+++ b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt
@@ -40,7 +40,6 @@ target_sources(ydb-core-tx-columnshard-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
)
add_test(
NAME
diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/ut/CMakeLists.linux-aarch64.txt
index 7e642963ed5..a2b5d071235 100644
--- a/ydb/core/tx/columnshard/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/ut/CMakeLists.linux-aarch64.txt
@@ -42,7 +42,6 @@ target_sources(ydb-core-tx-columnshard-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
)
add_test(
NAME
diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt
index 1b40ae04947..4de436d1222 100644
--- a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt
+++ b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt
@@ -44,7 +44,6 @@ target_sources(ydb-core-tx-columnshard-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
)
add_test(
NAME
diff --git a/ydb/core/tx/schemeshard/CMakeLists.txt b/ydb/core/tx/schemeshard/CMakeLists.txt
index eba7306656c..d227762db86 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.txt
@@ -89,6 +89,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC
ydb-library-login
library-login-protos
library-yql-minikql
+ ydb-services-bg_tasks
)
target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard.cpp
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
index bdb673602af..46cde5886d0 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
@@ -3,6 +3,7 @@
#include "schemeshard_impl.h"
#include <ydb/core/base/subdomain.h>
+#include <ydb/core/tx/tiering/cleaner_task.h>
namespace NKikimr {
namespace NSchemeShard {
@@ -262,23 +263,7 @@ private:
<< " operationId#" << OperationId;
}
-public:
- TProposedDeleteParts(TOperationId id)
- : OperationId(id)
- {
- IgnoreMessages(DebugHint(),
- {TEvColumnShard::TEvProposeTransactionResult::EventType,
- TEvColumnShard::TEvNotifyTxCompletionResult::EventType,
- TEvPrivate::TEvOperationPlan::EventType});
- }
-
- bool ProgressState(TOperationContext& context) override {
- TTabletId ssId = context.SS->SelfTabletId();
-
- LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- DebugHint() << " ProgressState"
- << ", at schemeshard: " << ssId);
-
+ bool Finish(TOperationContext& context) {
TTxState* txState = context.SS->FindTx(OperationId);
Y_VERIFY(txState);
Y_VERIFY(txState->TxType == TTxState::TxDropColumnTable);
@@ -303,6 +288,40 @@ public:
context.OnComplete.DoneOperation(OperationId);
return true;
}
+public:
+ TProposedDeleteParts(TOperationId id)
+ : OperationId(id)
+ {
+ IgnoreMessages(DebugHint(),
+ {TEvColumnShard::TEvProposeTransactionResult::EventType,
+ TEvColumnShard::TEvNotifyTxCompletionResult::EventType,
+ TEvPrivate::TEvOperationPlan::EventType});
+ }
+
+ bool HandleReply(NBackgroundTasks::TEvAddTaskResult::TPtr& ev, TOperationContext& context) override {
+ Y_VERIFY(ev->Get()->IsSuccess());
+ return Finish(context);
+ }
+
+ bool ProgressState(TOperationContext& context) override {
+ TTabletId ssId = context.SS->SelfTabletId();
+ TTxState* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
+ Y_VERIFY(txState->TxType == TTxState::TxDropColumnTable);
+
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " ProgressState"
+ << ", at schemeshard: " << ssId);
+
+ if (NBackgroundTasks::TServiceOperator::IsEnabled()) {
+ NBackgroundTasks::TTask task(std::make_shared<NColumnShard::NTiers::TTaskCleanerActivity>(txState->TargetPathId.LocalPathId), nullptr);
+ task.SetId(OperationId.SerializeToString());
+ context.SS->SelfId().Send(NBackgroundTasks::MakeServiceId(context.SS->SelfId().NodeId()), new NBackgroundTasks::TEvAddTask(std::move(task)));
+ return false;
+ } else {
+ return Finish(context);
+ }
+ }
};
class TDropColumnTable : public TSubOperation {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp
index 1e769353310..e30f7511a83 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp
@@ -7,11 +7,18 @@ namespace NSchemeShard {
template<class T>
struct TDebugEvent {
- static TString ToString (const typename T::TPtr & ev) {
+ static TString ToString(const typename T::TPtr& ev) {
return ev->Get()->Record.ShortDebugString();
}
};
+template<>
+struct TDebugEvent<NBackgroundTasks::TEvAddTaskResult> {
+ static TString ToString(const NBackgroundTasks::TEvAddTaskResult::TPtr& ev) {
+ return ev->Get()->GetDebugString();
+ }
+};
+
template <>
struct TDebugEvent<TEvPrivate::TEvOperationPlan> {
static TString ToString (const TEvPrivate::TEvOperationPlan::TPtr& ev) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
index cfe934dc32e..543a19aa69d 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
@@ -21,6 +21,7 @@
#include <ydb/core/blockstore/core/blockstore.h>
#include <ydb/core/filestore/core/filestore.h>
+#include <ydb/services/bg_tasks/service.h>
#include <util/generic/ptr.h>
#include <util/generic/set.h>
@@ -39,6 +40,7 @@
action(TEvDataShard::TEvSplitPartitioningChangedAck, NSchemeShard::TXTYPE_SPLIT_PARTITIONING_CHANGED_DST_ACK) \
\
action(TEvColumnShard::TEvProposeTransactionResult, NSchemeShard::TXTYPE_COLUMNSHARD_PROPOSE_RESULT) \
+ action(NBackgroundTasks::TEvAddTaskResult, NSchemeShard::TXTYPE_ADD_BACKGROUND_TASK_RESULT) \
action(TEvColumnShard::TEvNotifyTxCompletionResult, NSchemeShard::TXTYPE_COLUMNSHARD_NOTIFY_TX_COMPLETION_RESULT) \
\
action(NSequenceShard::TEvSequenceShard::TEvCreateSequenceResult, NSchemeShard::TXTYPE_SEQUENCESHARD_CREATE_SEQUENCE_RESULT) \
diff --git a/ydb/core/tx/schemeshard/schemeshard_identificators.h b/ydb/core/tx/schemeshard/schemeshard_identificators.h
index 83472071526..989716527fa 100644
--- a/ydb/core/tx/schemeshard/schemeshard_identificators.h
+++ b/ydb/core/tx/schemeshard/schemeshard_identificators.h
@@ -75,6 +75,34 @@ public:
explicit operator bool() const {
return GetTxId() != InvalidTxId && GetSubTxId() != InvalidSubTxId;
}
+
+ TString SerializeToString() const {
+ return "SSO:" + ::ToString(GetTxId().GetValue()) + ":" + ::ToString(GetSubTxId());
+ }
+
+ bool DeserializeFromString(const TString& data) {
+ TStringBuf sb(data.data(), data.size());
+ if (!sb.StartsWith("SSO:")) {
+ return false;
+ }
+ sb.Skip(4);
+ TStringBuf l;
+ TStringBuf r;
+ if (!sb.TrySplit(':', l, r)) {
+ return false;
+ }
+ ui64 txId;
+ TSubTxId subTxId;
+ if (!TryFromString(l, txId)) {
+ return false;
+ }
+ if (!TryFromString(r, subTxId)) {
+ return false;
+ }
+ first = TTxId(txId);
+ second = subTxId;
+ return true;
+ }
};
constexpr TOperationId InvalidOperationId = TOperationId(InvalidTxId, InvalidSubTxId);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index de6e27a2eee..66032109e2b 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -4056,6 +4056,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
//
HFuncTraced(TEvColumnShard::TEvProposeTransactionResult, Handle);
+ HFuncTraced(NBackgroundTasks::TEvAddTaskResult, Handle);
HFuncTraced(TEvColumnShard::TEvNotifyTxCompletionResult, Handle);
// sequence shard
@@ -5020,6 +5021,38 @@ void TSchemeShard::Handle(TEvBlobDepot::TEvApplyConfigResult::TPtr& ev, const TA
}
}
+void TSchemeShard::Handle(NBackgroundTasks::TEvAddTaskResult::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Handle NBackgroundTasks::TEvAddTaskResult"
+ << ", at schemeshard: " << TabletID()
+ << ", message: " << ev->Get()->GetDebugString());
+ TOperationId id;
+ if (!id.DeserializeFromString(ev->Get()->GetTaskId())) {
+ LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Got NBackgroundTasks::TEvAddTaskResult cannot parse operation id in result"
+ << ", message: " << ev->Get()->GetDebugString()
+ << ", at schemeshard: " << TabletID());
+ return;
+ }
+ if (!Operations.contains(id.GetTxId())) {
+ LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Got NBackgroundTasks::TEvAddTaskResult for unknown txId, ignore it"
+ << ", txId: " << id.SerializeToString()
+ << ", message: " << ev->Get()->GetDebugString()
+ << ", at schemeshard: " << TabletID());
+ return;
+ }
+ if (!ev->Get()->IsSuccess()) {
+ LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Got NBackgroundTasks::TEvAddTaskResult cannot execute"
+ << ", txId: " << id.SerializeToString()
+ << ", message: " << ev->Get()->GetDebugString()
+ << ", at schemeshard: " << TabletID());
+ return;
+ }
+ Execute(CreateTxOperationReply(id, ev), ctx);
+}
+
void TSchemeShard::Handle(TEvColumnShard::TEvProposeTransactionResult::TPtr &ev, const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Handle TEvProposeTransactionResult"
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 859562ecfb1..572cb913bbc 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -51,6 +51,8 @@
#include <ydb/library/login/login.h>
+#include <ydb/services/bg_tasks/service.h>
+
#include <util/generic/ptr.h>
namespace NKikimr {
@@ -921,7 +923,8 @@ public:
void Handle(TEvPrivate::TEvSubscribeToShardDeletion::TPtr &ev, const TActorContext &ctx);
void Handle(TEvHive::TEvDeleteOwnerTabletsReply::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPersQueue::TEvDropTabletReply::TPtr &ev, const TActorContext &ctx);
- void Handle(TEvColumnShard::TEvProposeTransactionResult::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvColumnShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(NBackgroundTasks::TEvAddTaskResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvColumnShard::TEvNotifyTxCompletionResult::TPtr &ev, const TActorContext &ctx);
void Handle(NSequenceShard::TEvSequenceShard::TEvCreateSequenceResult::TPtr &ev, const TActorContext &ctx);
void Handle(NSequenceShard::TEvSequenceShard::TEvDropSequenceResult::TPtr &ev, const TActorContext &ctx);
diff --git a/ydb/core/tx/tiering/CMakeLists.txt b/ydb/core/tx/tiering/CMakeLists.txt
index 48b8969a74c..2a598bd407c 100644
--- a/ydb/core/tx/tiering/CMakeLists.txt
+++ b/ydb/core/tx/tiering/CMakeLists.txt
@@ -18,9 +18,13 @@ target_link_libraries(core-tx-tiering PUBLIC
core-tablet_flat-protos
ydb-core-wrappers
api-protos
+ services-bg_tasks-abstract
ydb-services-metadata
)
target_sources(core-tx-tiering PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier_cleaner.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/path_cleaner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/decoder.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/external_data.cpp
@@ -28,3 +32,21 @@ target_sources(core-tx-tiering PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/s3_actor.cpp
)
+
+add_global_library_for(core-tx-tiering.global core-tx-tiering)
+target_link_libraries(core-tx-tiering.global PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-json-writer
+ ydb-core-blobstorage
+ ydb-core-protos
+ core-tablet_flat-protos
+ ydb-core-wrappers
+ api-protos
+ services-bg_tasks-abstract
+ ydb-services-metadata
+)
+target_sources(core-tx-tiering.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/cleaner_task.cpp
+)
diff --git a/ydb/core/tx/tiering/cleaner_task.cpp b/ydb/core/tx/tiering/cleaner_task.cpp
new file mode 100644
index 00000000000..1872a955349
--- /dev/null
+++ b/ydb/core/tx/tiering/cleaner_task.cpp
@@ -0,0 +1,25 @@
+#include "cleaner_task.h"
+#include "path_cleaner.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+
+TTaskCleanerActivity::TFactory::TRegistrator<TTaskCleanerActivity> TTaskCleanerActivity::Registrator(TTaskCleanerActivity::GetClassNameStatic());
+
+NKikimrSchemeOp::TTaskCleaner TTaskCleanerActivity::DoSerializeToProto() const {
+ NKikimrSchemeOp::TTaskCleaner result;
+ result.SetPathId(PathId);
+ return result;
+}
+
+bool TTaskCleanerActivity::DoDeserializeFromProto(const NKikimrSchemeOp::TTaskCleaner& protoData) {
+ PathId = protoData.GetPathId();
+ return true;
+}
+
+void TTaskCleanerActivity::DoExecute(NBackgroundTasks::ITaskExecutorController::TPtr controller,
+ const NBackgroundTasks::TTaskStateContainer& /*state*/)
+{
+ TActivationContext::AsActorContext().Register(new TPathCleaner(PathId, controller));
+}
+
+}
diff --git a/ydb/core/tx/tiering/cleaner_task.h b/ydb/core/tx/tiering/cleaner_task.h
new file mode 100644
index 00000000000..d036fc8aebf
--- /dev/null
+++ b/ydb/core/tx/tiering/cleaner_task.h
@@ -0,0 +1,37 @@
+#pragma once
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/services/bg_tasks/abstract/activity.h>
+#include <ydb/services/bg_tasks/abstract/interface.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TTaskCleanerActivity: public NBackgroundTasks::IProtoStringSerializable<NKikimrSchemeOp::TTaskCleaner, NBackgroundTasks::ITaskActivity> {
+private:
+ YDB_READONLY(ui64, PathId, 0);
+ static TFactory::TRegistrator<TTaskCleanerActivity> Registrator;
+protected:
+ virtual NKikimrSchemeOp::TTaskCleaner DoSerializeToProto() const override;
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TTaskCleaner& protoData) override;
+
+ virtual void DoExecute(NBackgroundTasks::ITaskExecutorController::TPtr controller,
+ const NBackgroundTasks::TTaskStateContainer& /*state*/) override;
+public:
+ TTaskCleanerActivity() = default;
+
+ TTaskCleanerActivity(const ui64 pathId)
+ : PathId(pathId)
+ {
+
+ }
+
+ static TString GetClassNameStatic() {
+ return "sso_path_tiers_cleaner";
+ }
+
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+}
diff --git a/ydb/core/tx/tiering/common.cpp b/ydb/core/tx/tiering/common.cpp
new file mode 100644
index 00000000000..3c7605000d9
--- /dev/null
+++ b/ydb/core/tx/tiering/common.cpp
@@ -0,0 +1,5 @@
+#include "common.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+
+}
diff --git a/ydb/core/tx/tiering/common.h b/ydb/core/tx/tiering/common.h
new file mode 100644
index 00000000000..a891f6acf8d
--- /dev/null
+++ b/ydb/core/tx/tiering/common.h
@@ -0,0 +1,14 @@
+#pragma once
+#include <ydb/core/base/events.h>
+
+#include <library/cpp/actors/core/events.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+enum EEvents {
+ EvTierCleared = EventSpaceBegin(TKikimrEvents::ES_TIERING),
+ EvEnd
+};
+
+static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_TIERING), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TIERING)");
+}
diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp
index ebabf12bcb3..eab129ed7b1 100644
--- a/ydb/core/tx/tiering/external_data.cpp
+++ b/ydb/core/tx/tiering/external_data.cpp
@@ -17,7 +17,7 @@ bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQuery
for (auto&& r : rawData.rows()) {
TTierConfig config;
if (!config.DeserializeFromRecord(decoder, r)) {
- ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse tier config from snapshot";
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot parse tier config from snapshot";
continue;
}
TierConfigs.emplace(config.GetConfigId(), config);
@@ -31,7 +31,7 @@ bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQuery
for (auto&& r : rawData.rows()) {
TTieringRule tr;
if (!tr.DeserializeFromRecord(decoder, r)) {
- ALS_WARN(NKikimrServices::TX_COLUMNSHARD) << "cannot parse record for tiering info";
+ ALS_WARN(NKikimrServices::TX_TIERING) << "cannot parse record for tiering info";
continue;
}
rulesLocal.emplace_back(std::move(tr));
@@ -69,6 +69,25 @@ const TTableTiering* TConfigsSnapshot::GetTableTiering(const TString& tablePath)
}
}
+std::vector<NKikimr::NColumnShard::NTiers::TTierConfig> TConfigsSnapshot::GetTiersForPathId(const ui64 pathId) const {
+ std::vector<TTierConfig> result;
+ std::set<TString> readyIds;
+ for (auto&& i : TableTierings) {
+ for (auto&& r : i.second.GetRules()) {
+ if (r.GetTablePathId() == pathId) {
+ auto it = TierConfigs.find(r.GetTierId());
+ if (it == TierConfigs.end()) {
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency tiering for " << r.GetTierId();
+ continue;
+ } else if (readyIds.emplace(r.GetTierId()).second) {
+ result.emplace_back(it->second);
+ }
+ }
+ }
+ }
+ return result;
+}
+
TVector<NMetadataProvider::ITableModifier::TPtr> TSnapshotConstructor::DoGetTableSchema() const {
TVector<NMetadataProvider::ITableModifier::TPtr> result;
{
diff --git a/ydb/core/tx/tiering/external_data.h b/ydb/core/tx/tiering/external_data.h
index bb03ed8c9f4..3a01c87c482 100644
--- a/ydb/core/tx/tiering/external_data.h
+++ b/ydb/core/tx/tiering/external_data.h
@@ -21,6 +21,7 @@ protected:
virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override;
virtual TString DoSerializeToString() const override;
public:
+ std::vector<TTierConfig> GetTiersForPathId(const ui64 pathId) const;
const TTableTiering* GetTableTiering(const TString& tablePath) const;
void RemapTablePathToId(const TString& path, const ui64 pathId);
std::optional<TTierConfig> GetValue(const TString& key) const;
diff --git a/ydb/core/tx/tiering/path_cleaner.cpp b/ydb/core/tx/tiering/path_cleaner.cpp
new file mode 100644
index 00000000000..5e6d54ad488
--- /dev/null
+++ b/ydb/core/tx/tiering/path_cleaner.cpp
@@ -0,0 +1,55 @@
+#include "path_cleaner.h"
+#include "external_data.h"
+
+#include <ydb/services/metadata/service.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+void TPathCleaner::Handle(TEvTierCleared::TPtr& ev) {
+ TiersWait.erase(ev->Get()->GetTierName());
+ Truncated |= ev->Get()->GetTruncated();
+ if (TiersWait.empty()) {
+ if (Truncated) {
+ Controller->TaskInterrupted(nullptr);
+ } else {
+ Controller->TaskFinished();
+ }
+ }
+}
+
+NMetadataProvider::ISnapshotParser::TPtr TPathCleaner::GetTieringSnapshotParser() const {
+ if (!ExternalDataManipulation) {
+ ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
+ auto edmPtr = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation);
+ edmPtr->Start(edmPtr);
+ }
+ return ExternalDataManipulation;
+}
+
+void TPathCleaner::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
+ auto configsSnapshot = ev->Get()->GetSnapshotAs<TConfigsSnapshot>();
+ Y_VERIFY(configsSnapshot);
+ Send(NMetadataProvider::MakeServiceId(SelfId().NodeId()), new NMetadataProvider::TEvUnsubscribeExternal(GetTieringSnapshotParser()));
+ std::vector<TTierConfig> configs = configsSnapshot->GetTiersForPathId(PathId);
+ for (auto&& i : configs) {
+ auto config = NWrappers::IExternalStorageConfig::Construct(i.GetProtoConfig().GetObjectStorage());
+ if (!config) {
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot construct storage config for " << i.GetTierName();
+ continue;
+ }
+ Register(new TTierCleaner(i.GetTierName(), SelfId(), PathId, config));
+ TiersWait.emplace(i.GetTierName());
+ }
+}
+
+void TPathCleaner::Bootstrap() {
+ Become(&TPathCleaner::StateMain);
+ Send(NMetadataProvider::MakeServiceId(SelfId().NodeId()), new NMetadataProvider::TEvSubscribeExternal(GetTieringSnapshotParser()));
+}
+
+TPathCleaner::TPathCleaner(const ui64 pathId, NBackgroundTasks::ITaskExecutorController::TPtr controller)
+ : PathId(pathId)
+ , Controller(controller) {
+}
+
+}
diff --git a/ydb/core/tx/tiering/path_cleaner.h b/ydb/core/tx/tiering/path_cleaner.h
new file mode 100644
index 00000000000..44963a3e986
--- /dev/null
+++ b/ydb/core/tx/tiering/path_cleaner.h
@@ -0,0 +1,44 @@
+#pragma once
+#include "common.h"
+#include "tier_cleaner.h"
+
+#include <ydb/core/wrappers/abstract.h>
+#include <ydb/core/wrappers/events/list_objects.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/services/bg_tasks/abstract/activity.h>
+#include <ydb/services/metadata/abstract/common.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TPathCleaner: public TActorBootstrapped<TPathCleaner> {
+private:
+ YDB_READONLY(ui64, PathId, 0);
+ bool Truncated = false;
+ std::set<TString> TiersWait;
+ NBackgroundTasks::ITaskExecutorController::TPtr Controller;
+ mutable NMetadataProvider::ISnapshotParser::TPtr ExternalDataManipulation;
+ NMetadataProvider::ISnapshotParser::TPtr GetTieringSnapshotParser() const;
+protected:
+ void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev);
+ void Handle(TEvTierCleared::TPtr& ev);
+public:
+ TPathCleaner(const ui64 pathId, NBackgroundTasks::ITaskExecutorController::TPtr controller);
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::TX_TIERING_PATH_CLEANER;
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
+ hFunc(TEvTierCleared, Handle);
+ default:
+ break;
+ }
+ }
+
+ void Bootstrap();
+};
+}
diff --git a/ydb/core/tx/tiering/rule.h b/ydb/core/tx/tiering/rule.h
index 4481f500925..1187efc8ff1 100644
--- a/ydb/core/tx/tiering/rule.h
+++ b/ydb/core/tx/tiering/rule.h
@@ -89,7 +89,7 @@ public:
if (Rules.size()) {
Y_VERIFY(Rules.back().GetDurationForEvict() <= tr.GetDurationForEvict());
if (Column != tr.GetColumn()) {
- ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "inconsistency rule column: " <<
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency rule column: " <<
TablePath << "/" << Column << " != " << tr.GetColumn();
return;
}
diff --git a/ydb/core/tx/tiering/tier_cleaner.cpp b/ydb/core/tx/tiering/tier_cleaner.cpp
new file mode 100644
index 00000000000..e7b8f94d850
--- /dev/null
+++ b/ydb/core/tx/tiering/tier_cleaner.cpp
@@ -0,0 +1,58 @@
+#include "tier_cleaner.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+
+void TTierCleaner::Handle(NWrappers::NExternalStorage::TEvListObjectsResponse::TPtr& ev) {
+ Truncated = ev->Get()->GetResult().GetIsTruncated();
+ auto request = Aws::S3::Model::DeleteObjectsRequest();
+
+ Aws::S3::Model::Delete deleteInfo;
+ for (auto&& i : ev->Get()->GetResult().GetContents()) {
+ Aws::S3::Model::ObjectIdentifier id;
+ id.WithKey(i.GetKey());
+ deleteInfo.AddObjects(std::move(id));
+ }
+ if (ev->Get()->GetResult().GetContents().empty()) {
+ Send(OwnerId, new TEvTierCleared(TierName, false));
+ } else {
+ request.WithDelete(deleteInfo);
+ Send(SelfId(), new NWrappers::TEvExternalStorage::TEvDeleteObjectsRequest(request));
+ }
+}
+
+void TTierCleaner::Handle(NWrappers::NExternalStorage::TEvDeleteObjectsRequest::TPtr& ev) {
+ Storage->Execute(ev);
+}
+
+void TTierCleaner::Handle(NWrappers::NExternalStorage::TEvDeleteObjectsResponse::TPtr& ev) {
+ if (!ev->Get()->IsSuccess()) {
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot remove objects pack: " << ev->Get()->GetResult();
+ Send(OwnerId, new TEvTierCleared(TierName, true));
+ } else {
+ Send(OwnerId, new TEvTierCleared(TierName, Truncated));
+ }
+}
+
+void TTierCleaner::Handle(NWrappers::NExternalStorage::TEvListObjectsRequest::TPtr& ev) {
+ Storage->Execute(ev);
+}
+
+void TTierCleaner::Bootstrap() {
+ Become(&TTierCleaner::StateMain);
+
+ auto request = Aws::S3::Model::ListObjectsRequest()
+ .WithPrefix("S3-" + ::ToString(PathId));
+
+ Send(SelfId(), new NWrappers::TEvExternalStorage::TEvListObjectsRequest(request));
+}
+
+TTierCleaner::TTierCleaner(const TString& tierName, const TActorId& ownerId,
+ const ui64 pathId, NWrappers::IExternalStorageConfig::TPtr storageConfig)
+ : TierName(tierName)
+ , OwnerId(ownerId)
+ , PathId(pathId)
+ , StorageConfig(storageConfig) {
+ Storage = StorageConfig->ConstructStorageOperator();
+ Y_VERIFY(Storage);
+}
+}
diff --git a/ydb/core/tx/tiering/tier_cleaner.h b/ydb/core/tx/tiering/tier_cleaner.h
new file mode 100644
index 00000000000..9c8d421ad08
--- /dev/null
+++ b/ydb/core/tx/tiering/tier_cleaner.h
@@ -0,0 +1,61 @@
+#pragma once
+#include "common.h"
+
+#include <ydb/core/wrappers/abstract.h>
+#include <ydb/core/wrappers/events/list_objects.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/services/bg_tasks/abstract/activity.h>
+#include <ydb/services/metadata/abstract/common.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TEvTierCleared: public TEventLocal<TEvTierCleared, EEvents::EvTierCleared> {
+private:
+ YDB_READONLY_DEF(TString, TierName);
+ YDB_READONLY(bool, Truncated, false);
+public:
+ TEvTierCleared(const TString& tierName, const bool truncated)
+ : TierName(tierName)
+ , Truncated(truncated)
+ {
+
+ }
+};
+
+class TTierCleaner: public TActorBootstrapped<TTierCleaner> {
+private:
+ const TString TierName;
+ const TActorId OwnerId;
+ const ui64 PathId = 0;
+ NWrappers::NExternalStorage::IExternalStorageConfig::TPtr StorageConfig;
+ NWrappers::NExternalStorage::IExternalStorageOperator::TPtr Storage;
+ bool Truncated = true;
+protected:
+ void Handle(NWrappers::NExternalStorage::TEvListObjectsResponse::TPtr& ev);
+ void Handle(NWrappers::NExternalStorage::TEvDeleteObjectsResponse::TPtr& ev);
+ void Handle(NWrappers::NExternalStorage::TEvListObjectsRequest::TPtr& ev);
+ void Handle(NWrappers::NExternalStorage::TEvDeleteObjectsRequest::TPtr& ev);
+public:
+ TTierCleaner(const TString& tierName, const TActorId& ownerId, const ui64 pathId,
+ NWrappers::IExternalStorageConfig::TPtr storageConfig);
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::TX_TIERING_TIER_CLEANER;
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NWrappers::NExternalStorage::TEvListObjectsResponse, Handle);
+ hFunc(NWrappers::NExternalStorage::TEvDeleteObjectsResponse, Handle);
+ hFunc(NWrappers::NExternalStorage::TEvListObjectsRequest, Handle);
+ hFunc(NWrappers::NExternalStorage::TEvDeleteObjectsRequest, Handle);
+ default:
+ break;
+ }
+ }
+
+ void Bootstrap();
+};
+}
diff --git a/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index 851166f30d4..3b1caea1d88 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -1,4 +1,3 @@
-#include "columnshard_ut_common.h"
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/tx/tiering/external_data.h>
@@ -6,11 +5,14 @@
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
#include <ydb/core/wrappers/s3_wrapper.h>
+#include <ydb/core/wrappers/fake_storage.h>
+#include <ydb/library/accessor/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/services/metadata/service.h>
#include <library/cpp/actors/core/av_bootstrapped.h>
#include <library/cpp/protobuf/json/proto2json.h>
+#include <library/cpp/testing/unittest/registar.h>
#include <util/system/hostname.h>
@@ -135,7 +137,9 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
for (const TInstant start = Now(); !IsFound() && Now() - start < TDuration::Seconds(10); ) {
runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
+ runtime.UpdateCurrentTime(Now());
}
+ runtime.SetObserverFunc(TTestActorRuntime::DefaultObserverFunc);
Y_VERIFY(IsFound());
}
@@ -225,46 +229,19 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
TTestCSEmulator* emulator = new TTestCSEmulator;
emulator->MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc"));
runtime.Register(emulator);
- {
- const TInstant start = Now();
- while (Now() - start < TDuration::Seconds(10)) {
- runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
- Cerr << "Step " << event->Type << Endl;
- return false;
- }, {}, TDuration::Seconds(1));
- Sleep(TDuration::Seconds(1));
- Cerr << "Step finished" << Endl;
- }
- }
+ runtime.SimulateSleep(TDuration::Seconds(10));
+ Cerr << "Initialization finished" << Endl;
- {
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto session = f.GetValueSync().GetSession();
- session.ExecuteDataQuery(
- "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
- "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
- }
- {
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto session = f.GetValueSync().GetSession();
- session.ExecuteDataQuery(
- "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
- "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
- }
+ lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')");
+ lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')");
const TInstant start = Now();
while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
- runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10));
+ runtime.SimulateSleep(TDuration::Seconds(1));
}
Y_VERIFY(emulator->IsFound());
}
- //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
- //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
}
Y_UNIT_TEST(DSConfigs) {
@@ -295,36 +272,14 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_INFO);
-// runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
- for (const TInstant start = Now(); Now() - start < TDuration::Seconds(10); ) {
- runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
- Cerr << "Step " << event->Type << Endl;
- return false;
- }, {}, TDuration::Seconds(1));
- Sleep(TDuration::Seconds(1));
- Cerr << "Step finished" << Endl;
- }
-
- {
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto session = f.GetValueSync().GetSession();
- session.ExecuteDataQuery(
- "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
- "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
- }
- {
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto session = f.GetValueSync().GetSession();
- session.ExecuteDataQuery(
- "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
- "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
- }
+ // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
+ runtime.SimulateSleep(TDuration::Seconds(10));
+ Cerr << "Initialization finished" << Endl;
+
+ lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')");
+ lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')");
{
TTestCSEmulator emulator;
emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc"));
@@ -332,27 +287,10 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
emulator.CheckRuntime(runtime);
}
- {
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto session = f.GetValueSync().GetSession();
- session.ExecuteDataQuery(
- "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
- "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr + "')"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
- }
- {
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto session = f.GetValueSync().GetSession();
- session.ExecuteDataQuery(
- "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
- "VALUES ('/Root/olapStore', 'tier2', '/Root/olapStore/olapTable', 'timestamp', '20d')"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
- }
-
+ lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr + "')");
+ lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ "VALUES ('/Root/olapStore', 'tier2', '/Root/olapStore/olapTable', 'timestamp', '20d')");
{
TTestCSEmulator emulator;
emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc"));
@@ -361,24 +299,8 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
emulator.CheckRuntime(runtime);
}
- {
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto session = f.GetValueSync().GetSession();
- session.ExecuteDataQuery(
- "DELETE FROM `/Root/.external_data/tiers`"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
- }
- {
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto session = f.GetValueSync().GetSession();
- session.ExecuteDataQuery(
- "DELETE FROM `/Root/.external_data/tiering`"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
- }
+ lHelper.StartDataRequest("DELETE FROM `/Root/.external_data/tiers`");
+ lHelper.StartDataRequest("DELETE FROM `/Root/.external_data/tiering`");
{
TTestCSEmulator emulator;
emulator.SetTieringsCount(0);
@@ -388,6 +310,137 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
//runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
//runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
}
+//#define S3_TEST_USAGE
+#ifdef S3_TEST_USAGE
+ const TString TierConfigProtoStr =
+ R"(
+ Name : "fakeTier"
+ ObjectStorage : {
+ Scheme: HTTP
+ VerifySSL: false
+ Endpoint: "storage.cloud-preprod.yandex.net"
+ Bucket: "tiering-test-01"
+ AccessKey: "..."
+ SecretKey: "..."
+ ProxyHost: "localhost"
+ ProxyPort: 8080
+ ProxyScheme: HTTP
+ }
+ )";
+ const TString TierEndpoint = "storage.cloud-preprod.yandex.net";
+#else
+ const TString TierConfigProtoStr =
+ R"(
+ Name : "fakeTier"
+ ObjectStorage : {
+ Endpoint: "fake"
+ Bucket: "fake"
+ }
+ )";
+ const TString TierEndpoint = "fake";
+#endif
+
+ Y_UNIT_TEST(TieringUsage) {
+ TPortManager pm;
+
+ ui32 grpcPort = pm.GetPort();
+ ui32 msgbPort = pm.GetPort();
+
+ Tests::TServerSettings serverSettings(msgbPort);
+ serverSettings.Port = msgbPort;
+ serverSettings.GrpcPort = grpcPort;
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMetadataProvider(true)
+ .SetEnableBackgroundTasks(true)
+ .SetEnableOlapSchemaOperations(true);
+ ;
+
+ Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
+ server->EnableGRpc(grpcPort);
+ Tests::TClient client(serverSettings);
+
+ auto& runtime = *server->GetRuntime();
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+ TLocalHelper lHelper(*server);
+ lHelper.CreateTestOlapTable("olapTable");
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
+ runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::BG_TASKS, NLog::PRI_DEBUG);
+ // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
+ Cerr << "Wait initialization" << Endl;
+ runtime.SimulateSleep(TDuration::Seconds(20));
+ Cerr << "Initialization finished" << Endl;
+
+ lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ "VALUES ('/Root/olapStore', 'fakeTier1', '" + TierConfigProtoStr + "')");
+ lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ "VALUES ('/Root/olapStore', 'fakeTier2', '" + TierConfigProtoStr + "')");
+ lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ "VALUES ('/Root/olapStore', 'fakeTier1', '/Root/olapStore/olapTable', 'timestamp', '10d')");
+ lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ "VALUES ('/Root/olapStore', 'fakeTier2', '/Root/olapStore/olapTable', 'timestamp', '20d')");
+ {
+ TTestCSEmulator emulator;
+ emulator.MutableCheckers().emplace("/Root/olapStore.fakeTier1", TJsonChecker("Name", "fakeTier"));
+ emulator.MutableCheckers().emplace("/Root/olapStore.fakeTier2", TJsonChecker("ObjectStorage.Endpoint", TierEndpoint));
+ emulator.SetTieringsCount(2);
+ emulator.CheckRuntime(runtime);
+ }
+ Cerr << "Insert..." << Endl;
+ const TInstant pkStart = Now() - TDuration::Days(15);
+ ui32 idx = 0;
+ lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", 0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 2000);
+ {
+ const TInstant start = Now();
+ bool check = false;
+ while (Now() - start < TDuration::Seconds(60)) {
+ Cerr << "Waiting..." << Endl;
+#ifndef S3_TEST_USAGE
+ if (Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetSize()) {
+ check = true;
+ Cerr << "Fake storage filled" << Endl;
+ break;
+ }
+#else
+ check = true;
+#endif
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ Y_VERIFY(check);
+ }
+#ifdef S3_TEST_USAGE
+ Cerr << "storage initialized..." << Endl;
+#endif
+
+ lHelper.DropTable("/Root/olapStore/olapTable");
+ {
+ const TInstant start = Now();
+ bool check = false;
+ while (Now() - start < TDuration::Seconds(60)) {
+ Cerr << "Cleaning waiting..." << Endl;
+#ifndef S3_TEST_USAGE
+ if (!Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetSize()) {
+ check = true;
+ Cerr << "Fake storage clean" << Endl;
+ break;
+ }
+#else
+ check = true;
+#endif
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ Y_VERIFY(check);
+ }
+#ifndef S3_TEST_USAGE
+ Y_VERIFY(Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetBucketsCount() == 1);
+#endif
+ }
}
}
diff --git a/ydb/services/bg_tasks/ds_table/executor.h b/ydb/services/bg_tasks/ds_table/executor.h
index 1a07c093991..4b84cd589e1 100644
--- a/ydb/services/bg_tasks/ds_table/executor.h
+++ b/ydb/services/bg_tasks/ds_table/executor.h
@@ -94,6 +94,7 @@ public:
: TBase(config.GetRequestConfig())
, Config(config)
{
+ TServiceOperator::Register();
}
};
diff --git a/ydb/services/bg_tasks/service.cpp b/ydb/services/bg_tasks/service.cpp
index 9c1b0dec5b0..9fbd6fcec2e 100644
--- a/ydb/services/bg_tasks/service.cpp
+++ b/ydb/services/bg_tasks/service.cpp
@@ -6,4 +6,12 @@ NActors::TActorId MakeServiceId(const ui32 nodeId) {
return NActors::TActorId(nodeId, "SrvcBgrdTask");
}
+void TServiceOperator::Register() {
+ Singleton<TServiceOperator>()->EnabledFlag = true;
+}
+
+bool TServiceOperator::IsEnabled() {
+ return Singleton<TServiceOperator>()->EnabledFlag;
+}
+
}
diff --git a/ydb/services/bg_tasks/service.h b/ydb/services/bg_tasks/service.h
index 0a022c60a56..308836037fc 100644
--- a/ydb/services/bg_tasks/service.h
+++ b/ydb/services/bg_tasks/service.h
@@ -70,6 +70,15 @@ public:
using TBase::TBase;
};
+class TServiceOperator {
+private:
+ friend class TExecutor;
+ bool EnabledFlag = false;
+ static void Register();
+public:
+ static bool IsEnabled();
+};
+
NActors::TActorId MakeServiceId(const ui32 nodeId);
}
diff --git a/ydb/services/metadata/abstract/CMakeLists.txt b/ydb/services/metadata/abstract/CMakeLists.txt
index 7ea18da9cfd..e635a4400ea 100644
--- a/ydb/services/metadata/abstract/CMakeLists.txt
+++ b/ydb/services/metadata/abstract/CMakeLists.txt
@@ -13,6 +13,7 @@ target_link_libraries(services-metadata-abstract PUBLIC
yutil
ydb-library-accessor
cpp-actors-core
+ services-metadata-request
api-protos
ydb-core-base
)
diff --git a/ydb/services/metadata/service.cpp b/ydb/services/metadata/service.cpp
index 736d41bfc56..70714c1c43b 100644
--- a/ydb/services/metadata/service.cpp
+++ b/ydb/services/metadata/service.cpp
@@ -2,8 +2,8 @@
namespace NKikimr::NMetadataProvider {
-NActors::TActorId MakeServiceId(ui32 node) {
- return NActors::TActorId(node, "SrvcMetaData");
+NActors::TActorId MakeServiceId(const ui32 nodeId) {
+ return NActors::TActorId(nodeId, "SrvcMetaData");
}
}
diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h
index 888ec5763b4..858986f779a 100644
--- a/ydb/services/metadata/service.h
+++ b/ydb/services/metadata/service.h
@@ -27,6 +27,6 @@ public:
}
};
-NActors::TActorId MakeServiceId(ui32 node = 0);
+NActors::TActorId MakeServiceId(const ui32 node);
}