aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-01 10:46:47 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-01 11:02:05 +0300
commit11dd57fd9a283200b4512930ce11839b8dbc0aac (patch)
treee66436f29feebf22ddc28fe1460f84739a784a40
parent5b1a1c2bce932d8302338706e484db8f613ac416 (diff)
downloadydb-11dd57fd9a283200b4512930ce11839b8dbc0aac.tar.gz
KIKIMR-19505: start task with resource broker through special actor and interface
-rw-r--r--.mapping.json5
-rw-r--r--ydb/core/tablet/resource_broker.cpp25
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h18
-rw-r--r--ydb/core/tx/columnshard/export_actor.cpp123
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/CMakeLists.darwin-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/actor.cpp47
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/actor.h41
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/counters.cpp29
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/counters.h49
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/events.cpp5
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/events.h23
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/task.cpp34
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/task.h68
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/ya.make16
-rw-r--r--ydb/core/tx/columnshard/ya.make2
23 files changed, 480 insertions, 128 deletions
diff --git a/.mapping.json b/.mapping.json
index 12598f8590..23aacb9b91 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -5384,6 +5384,11 @@
"ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt":"",
"ydb/core/tx/columnshard/operations/CMakeLists.txt":"",
"ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/tx/columnshard/resource_subscriber/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/tx/columnshard/resource_subscriber/CMakeLists.txt":"",
+ "ydb/core/tx/columnshard/resource_subscriber/CMakeLists.windows-x86_64.txt":"",
"ydb/core/tx/columnshard/resources/CMakeLists.darwin-x86_64.txt":"",
"ydb/core/tx/columnshard/resources/CMakeLists.linux-aarch64.txt":"",
"ydb/core/tx/columnshard/resources/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/tablet/resource_broker.cpp b/ydb/core/tablet/resource_broker.cpp
index 1124484e95..eef35dc1a8 100644
--- a/ydb/core/tablet/resource_broker.cpp
+++ b/ydb/core/tablet/resource_broker.cpp
@@ -1269,6 +1269,9 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
const ui64 KqpRmQueueCPU = 4;
const ui64 KqpRmQueueMemory = 10ULL << 30;
+ const ui64 CSInsertCompactionMemoryLimit = 1ULL << 30;
+ const ui64 CSGeneralCompactionMemoryLimit = 3ULL << 30;
+
const ui64 TotalCPU = 20;
const ui64 TotalMemory = 16ULL << 30;
@@ -1305,6 +1308,18 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
queue->MutableLimit()->SetCpu(3);
queue = config.AddQueues();
+ queue->SetName("QUEUE::CS::INDEXATION");
+ queue->SetWeight(100);
+ queue->MutableLimit()->SetCpu(3);
+ queue->MutableLimit()->SetMemory(CSInsertCompactionMemoryLimit);
+
+ queue = config.AddQueues();
+ queue->SetName("QUEUE::CS::GENERAL");
+ queue->SetWeight(100);
+ queue->MutableLimit()->SetCpu(3);
+ queue->MutableLimit()->SetMemory(CSGeneralCompactionMemoryLimit);
+
+ queue = config.AddQueues();
queue->SetName("queue_transaction");
queue->SetWeight(100);
queue->MutableLimit()->SetCpu(4);
@@ -1386,6 +1401,16 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
task = config.AddTasks();
+ task->SetName("CS::INDEXATION");
+ task->SetQueueName("QUEUE::CS::INDEXATION");
+ task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
+
+ task = config.AddTasks();
+ task->SetName("CS::GENERAL");
+ task->SetQueueName("QUEUE::CS::GENERAL");
+ task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
+
+ task = config.AddTasks();
task->SetName(NLocalDb::TransactionTaskName);
task->SetQueueName("queue_transaction");
task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index d3e1e8a7d0..9f1deb1cd4 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
add_subdirectory(operations)
+add_subdirectory(resource_subscriber)
add_subdirectory(resources)
add_subdirectory(splitter)
add_subdirectory(ut_rw)
@@ -55,6 +56,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-operations
tx-columnshard-blobs_reader
tx-columnshard-blobs_action
+ tx-columnshard-resource_subscriber
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
@@ -88,7 +90,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 5314837877..26c0b204e9 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
add_subdirectory(operations)
+add_subdirectory(resource_subscriber)
add_subdirectory(resources)
add_subdirectory(splitter)
add_subdirectory(ut_rw)
@@ -56,6 +57,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-operations
tx-columnshard-blobs_reader
tx-columnshard-blobs_action
+ tx-columnshard-resource_subscriber
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
@@ -89,7 +91,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 5314837877..26c0b204e9 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
add_subdirectory(operations)
+add_subdirectory(resource_subscriber)
add_subdirectory(resources)
add_subdirectory(splitter)
add_subdirectory(ut_rw)
@@ -56,6 +57,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-operations
tx-columnshard-blobs_reader
tx-columnshard-blobs_action
+ tx-columnshard-resource_subscriber
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
@@ -89,7 +91,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index 0b48bc6853..8c97dad14a 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
add_subdirectory(operations)
+add_subdirectory(resource_subscriber)
add_subdirectory(resources)
add_subdirectory(splitter)
add_subdirectory(ut_rw)
@@ -56,6 +57,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-operations
tx-columnshard-blobs_reader
tx-columnshard-blobs_action
+ tx-columnshard-resource_subscriber
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
@@ -89,7 +91,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index ae63a10f1a..b480b6d3ed 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -29,6 +29,7 @@ struct TEvPrivate {
EvWriteDraft,
EvGarbageCollectionFinished,
EvTieringModified,
+ EvStartResourceUsageTask,
EvEnd
};
@@ -225,6 +226,23 @@ struct TEvPrivate {
}
};
+ TString GetBlobVerified(const TBlobRange& bRange) const {
+ for (auto&& i : Actions) {
+ for (auto&& b : i->GetBlobsForWrite()) {
+ if (bRange.GetBlobId() == b.first) {
+ AFL_VERIFY(bRange.Size + bRange.Offset <= b.second.size());
+ if (bRange.Size == b.second.size()) {
+ return b.second;
+ } else {
+ return b.second.substr(bRange.Offset, bRange.Size);
+ }
+ }
+ }
+ }
+ AFL_VERIFY(false);
+ return "";
+ }
+
TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, const NEvWrite::TWriteMeta& writeMeta)
: PutResult(putResult)
, WriteMeta(writeMeta)
diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp
deleted file mode 100644
index 21b2333bd0..0000000000
--- a/ydb/core/tx/columnshard/export_actor.cpp
+++ /dev/null
@@ -1,123 +0,0 @@
-#include "columnshard_impl.h"
-#include "blob_cache.h"
-
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <ydb/core/util/backoff.h>
-
-namespace NKikimr::NColumnShard {
-namespace {
-
-class TExportActor : public TActorBootstrapped<TExportActor> {
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::TX_COLUMNSHARD_EXPORT_ACTOR;
- }
-
- TExportActor(ui64 tabletId, const TActorId& parent, TAutoPtr<TEvPrivate::TEvExport> ev)
- : TabletId(tabletId)
- , Parent(parent)
- , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
- , Event(ev.Release())
- {
- Y_VERIFY(Event);
- }
-
- void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) {
- LOG_S_TRACE("TEvReadBlobRangeResult (waiting " << BlobsToRead.size() << ") at tablet " << TabletId << " (export)");
-
- auto& event = *ev->Get();
- const TUnifiedBlobId& blobId = event.BlobRange.BlobId;
- if (event.Status != NKikimrProto::EReplyStatus::OK) {
- LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId
- << " status " << NKikimrProto::EReplyStatus_Name(event.Status)
- << " at tablet " << TabletId << " (export)");
-
- BlobsToRead.erase(blobId);
- Event->AddResult(blobId, blobId.ToStringNew(), true,
- TStringBuilder() << "cannot read, status " << NKikimrProto::EReplyStatus_Name(event.Status));
- return;
- }
-
- TString blobData = event.Data;
- Y_VERIFY(blobData.size() == blobId.BlobSize());
-
- if (!BlobsToRead.contains(blobId)) {
- return;
- }
-
- BlobsToRead.erase(blobId);
-
- Y_VERIFY(Event);
- {
- auto it = Event->Blobs.find(blobId);
- Y_VERIFY(it != Event->Blobs.end());
- it->second = blobData;
- }
-
- if (BlobsToRead.empty()) {
- SendResultAndDie(ctx);
- }
- }
-
- void Bootstrap(const TActorContext& /*ctx*/) {
- LOG_S_DEBUG("Exporting " << Event->Blobs.size() << " blobs at tablet " << TabletId);
-
- SendReads();
- Become(&TThis::StateWait);
- }
-
- STFUNC(StateWait) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle);
- default:
- break;
- }
- }
-
-private:
- ui64 TabletId;
- TActorId Parent;
- TActorId BlobCacheActorId;
- std::unique_ptr<TEvPrivate::TEvExport> Event;
- THashSet<TUnifiedBlobId> BlobsToRead;
-
- void SendReads() {
- for (auto& [blobId, _] : Event->Blobs) {
- BlobsToRead.emplace(blobId);
- SendReadRequest(NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize()));
- }
- }
-
- void SendReadRequest(const NBlobCache::TBlobRange& blobRange) {
- Y_VERIFY(!blobRange.Offset);
- Y_VERIFY(blobRange.Size);
-
- NBlobCache::TReadBlobRangeOptions readOpts {
- .CacheAfterRead = false,
- .ForceFallback = false,
- .IsBackgroud = true
- };
- Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts)));
- }
-
- void SendResultAndDie(const TActorContext& ctx) {
- if (Event->Status == NKikimrProto::UNKNOWN) {
- auto s3Actor = Event->DstActor;
- Event->DstActor = Parent;
- ctx.Send(s3Actor, Event.release());
- } else {
- Y_VERIFY(Event->Status == NKikimrProto::ERROR);
- Event->DstActor = Parent;
- ctx.Send(Parent, Event.release());
- }
- Die(ctx);
- }
-};
-
-} // namespace
-
-IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev) {
- return new TExportActor(tabletId, dstActor, ev);
-}
-
-}
diff --git a/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..524c67a195
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-resource_subscriber)
+target_link_libraries(tx-columnshard-resource_subscriber PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(tx-columnshard-resource_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/events.cpp
+)
diff --git a/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..a03cb2188d
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-resource_subscriber)
+target_link_libraries(tx-columnshard-resource_subscriber PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(tx-columnshard-resource_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/events.cpp
+)
diff --git a/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..a03cb2188d
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-resource_subscriber)
+target_link_libraries(tx-columnshard-resource_subscriber PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(tx-columnshard-resource_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/events.cpp
+)
diff --git a/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.txt b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..524c67a195
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-resource_subscriber)
+target_link_libraries(tx-columnshard-resource_subscriber PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(tx-columnshard-resource_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/events.cpp
+)
diff --git a/ydb/core/tx/columnshard/resource_subscriber/actor.cpp b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
new file mode 100644
index 0000000000..b91c13d16b
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
@@ -0,0 +1,47 @@
+#include "actor.h"
+
+namespace NKikimr::NOlap::NResourceBroker::NSubscribe {
+
+class TCookie: public TThrRefBase {
+private:
+ YDB_READONLY(ui64, TaskIdentifier, 0);
+public:
+ TCookie(const ui64 id)
+ : TaskIdentifier(id)
+ {
+
+ }
+};
+
+void TActor::Handle(TEvStartTask::TPtr& ev) {
+ auto task = ev->Get()->GetTask();
+ Y_VERIFY(task);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ask_resources")("task", task->DebugString());
+ Tasks.emplace(++Counter, task);
+ Send(NKikimr::NResourceBroker::MakeResourceBrokerID(), new NKikimr::NResourceBroker::TEvResourceBroker::TEvSubmitTask(
+ task->GetName(),
+ {{task->GetCPUAllocation(), task->GetMemoryAllocation()}},
+ task->GetType(),
+ task->GetPriority(),
+ new TCookie(Counter)
+ ));
+ task->GetContext().GetCounters()->OnRequest(task->GetMemoryAllocation());
+}
+
+void TActor::Handle(NKikimr::NResourceBroker::TEvResourceBroker::TEvResourceAllocated::TPtr& ev) {
+ auto it = Tasks.find(((TCookie*)ev->Get()->Cookie.Get())->GetTaskIdentifier());
+ Y_VERIFY(it != Tasks.end());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "result_resources")("task_id", ev->Get()->TaskId)("task", it->second->DebugString());
+ it->second->OnAllocationSuccess(ev->Get()->TaskId, SelfId());
+ Tasks.erase(it);
+ it->second->GetContext().GetCounters()->OnReply(it->second->GetMemoryAllocation());
+}
+
+TActor::TActor(ui64 tabletId, const TActorId& parent)
+ : TabletId(tabletId)
+ , Parent(parent)
+{
+
+}
+
+}
diff --git a/ydb/core/tx/columnshard/resource_subscriber/actor.h b/ydb/core/tx/columnshard/resource_subscriber/actor.h
new file mode 100644
index 0000000000..3f3d08610d
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/actor.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include "task.h"
+#include "events.h"
+#include <ydb/core/tablet/resource_broker.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NOlap::NResourceBroker::NSubscribe {
+
+class TActor: public TActorBootstrapped<TActor> {
+private:
+ ui64 TabletId;
+ NActors::TActorId Parent;
+ THashMap<ui64, std::shared_ptr<ITask>> Tasks;
+ ui64 Counter = 0;
+public:
+ static TAtomicCounter WaitingBlobsCount;
+ TActor(ui64 tabletId, const TActorId& parent);
+
+ void Handle(TEvStartTask::TPtr& ev);
+ void Handle(NKikimr::NResourceBroker::TEvResourceBroker::TEvResourceAllocated::TPtr& ev);
+
+ void Bootstrap() {
+ Become(&TThis::StateWait);
+ }
+
+ STFUNC(StateWait) {
+ TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent", Parent));
+ switch (ev->GetTypeRewrite()) {
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
+ hFunc(TEvStartTask, Handle);
+ hFunc(NKikimr::NResourceBroker::TEvResourceBroker::TEvResourceAllocated, Handle);
+ default:
+ break;
+ }
+ }
+
+ ~TActor() = default;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/resource_subscriber/counters.cpp b/ydb/core/tx/columnshard/resource_subscriber/counters.cpp
new file mode 100644
index 0000000000..be0794b9f6
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/counters.cpp
@@ -0,0 +1,29 @@
+#include "counters.h"
+
+namespace NKikimr::NOlap::NResourceBroker::NSubscribe {
+
+
+std::shared_ptr<TSubscriberTypeCounters> TSubscriberCounters::GetTypeCounters(const TString& resourceType) {
+ auto it = ResourceTypeCounters.find(resourceType);
+ if (it == ResourceTypeCounters.end()) {
+ it = ResourceTypeCounters.emplace(resourceType, std::make_shared<TSubscriberTypeCounters>(*this, resourceType)).first;
+ }
+ return it->second;
+}
+
+TSubscriberTypeCounters::TSubscriberTypeCounters(const TSubscriberCounters& owner, const TString& resourceType)
+ : TBase(owner)
+{
+ DeepSubGroup("ResourceType", resourceType);
+
+ RequestsCount = TBase::GetDeriviative("Requests/Count");
+ RequestBytes = TBase::GetDeriviative("Requests/Bytes");
+
+ RepliesCount = TBase::GetDeriviative("Replies/Count");
+ ReplyBytes = TBase::GetDeriviative("Replies/Bytes");
+
+ BytesAllocated = TBase::GetValueAutoAggregationsClient("Allocated/Bytes");
+ CountAllocated = TBase::GetValueAutoAggregationsClient("Allocated/Count");
+}
+
+}
diff --git a/ydb/core/tx/columnshard/resource_subscriber/counters.h b/ydb/core/tx/columnshard/resource_subscriber/counters.h
new file mode 100644
index 0000000000..f6fafbeaca
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/counters.h
@@ -0,0 +1,49 @@
+#pragma once
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <ydb/core/tx/columnshard/counters/common/owner.h>
+#include <util/generic/hash.h>
+
+namespace NKikimr::NOlap::NResourceBroker::NSubscribe {
+
+class TSubscriberCounters;
+
+class TSubscriberTypeCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr RequestsCount;
+ NMonitoring::TDynamicCounters::TCounterPtr RequestBytes;
+
+ NMonitoring::TDynamicCounters::TCounterPtr RepliesCount;
+ NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes;
+
+ YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TValueAggregationClient>, BytesAllocated);
+ YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TValueAggregationClient>, CountAllocated);
+public:
+ TSubscriberTypeCounters(const TSubscriberCounters& owner, const TString& resourceType);
+
+ void OnRequest(const ui64 bytes) const {
+ RequestsCount->Add(1);
+ RequestBytes->Add(bytes);
+ }
+
+ void OnReply(const ui64 bytes) const {
+ RepliesCount->Add(1);
+ ReplyBytes->Add(bytes);
+ }
+};
+
+class TSubscriberCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ THashMap<TString, std::shared_ptr<TSubscriberTypeCounters>> ResourceTypeCounters;
+ TMutex Mutex;
+public:
+ TSubscriberCounters()
+ : TBase("ResourcesSubscriber")
+ {
+ }
+
+ std::shared_ptr<TSubscriberTypeCounters> GetTypeCounters(const TString& resourceType);
+};
+
+}
diff --git a/ydb/core/tx/columnshard/resource_subscriber/events.cpp b/ydb/core/tx/columnshard/resource_subscriber/events.cpp
new file mode 100644
index 0000000000..2060750c10
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/events.cpp
@@ -0,0 +1,5 @@
+#include "events.h"
+
+namespace NKikimr::NOlap::NResourceBroker::NSubscribe {
+
+}
diff --git a/ydb/core/tx/columnshard/resource_subscriber/events.h b/ydb/core/tx/columnshard/resource_subscriber/events.h
new file mode 100644
index 0000000000..f4209987d2
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/events.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include "task.h"
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+#include <ydb/library/accessor/accessor.h>
+
+#include <library/cpp/actors/core/event_local.h>
+
+namespace NKikimr::NOlap::NResourceBroker::NSubscribe {
+
+class TEvStartTask: public NActors::TEventLocal<TEvStartTask, NColumnShard::TEvPrivate::EEv::EvStartResourceUsageTask> {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<ITask>, Task);
+public:
+
+ explicit TEvStartTask(std::shared_ptr<ITask> task)
+ : Task(task) {
+ }
+
+};
+
+
+}
diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.cpp b/ydb/core/tx/columnshard/resource_subscriber/task.cpp
new file mode 100644
index 0000000000..8dc1fdc646
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/task.cpp
@@ -0,0 +1,34 @@
+#include "task.h"
+#include "events.h"
+#include <ydb/core/tablet/resource_broker.h>
+#include <library/cpp/actors/core/log.h>
+
+namespace NKikimr::NOlap::NResourceBroker::NSubscribe {
+
+void ITask::OnAllocationSuccess(const ui64 taskId, const NActors::TActorId& senderId) {
+ DoOnAllocationSuccess(std::make_shared<TResourcesGuard>(taskId, *this, senderId, Context));
+}
+
+void ITask::Start(const NActors::TActorId& actorId, const std::shared_ptr<ITask>& task) {
+ NActors::TActorContext::AsActorContext().Send(actorId, std::make_unique<TEvStartTask>(task));
+}
+
+TResourcesGuard::~TResourcesGuard() {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "free_resources")("task_id", TaskId)("mem", Memory)("cpu", Cpu);
+ auto ev = std::make_unique<IEventHandle>(NKikimr::NResourceBroker::MakeResourceBrokerID(), Sender, new NKikimr::NResourceBroker::TEvResourceBroker::TEvFinishTask(TaskId));
+ NActors::TActorContext::AsActorContext().Send(std::move(ev));
+ Context.GetCounters()->GetBytesAllocated()->Remove(Memory);
+}
+
+TResourcesGuard::TResourcesGuard(const ui64 taskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context)
+ : TaskId(taskId)
+ , Sender(sender)
+ , Memory(task.GetMemoryAllocation())
+ , Cpu(task.GetCPUAllocation())
+ , Context(context)
+{
+ Context.GetCounters()->GetBytesAllocated()->Add(Memory);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "allocate_resources")("task_id", TaskId)("mem", Memory)("cpu", Cpu);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.h b/ydb/core/tx/columnshard/resource_subscriber/task.h
new file mode 100644
index 0000000000..39ef7356f9
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/task.h
@@ -0,0 +1,68 @@
+#pragma once
+#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
+#include "counters.h"
+
+namespace NKikimr::NOlap::NResourceBroker::NSubscribe {
+
+class ITask;
+
+class TTaskContext {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<TSubscriberTypeCounters>, Counters);
+ YDB_READONLY_DEF(TString, TypeName);
+public:
+ TTaskContext(const TString& typeName, const std::shared_ptr<TSubscriberCounters>& subscriberCounters)
+ : TypeName(typeName)
+ {
+ Counters = subscriberCounters->GetTypeCounters(TypeName);
+ }
+};
+
+class TResourcesGuard: public NColumnShard::TMonitoringObjectsCounter<TResourcesGuard> {
+private:
+ const ui64 TaskId;
+ const NActors::TActorId Sender;
+ const ui64 Memory;
+ const ui32 Cpu;
+ const TTaskContext Context;
+public:
+ TResourcesGuard(const ui64 taskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context);
+ ~TResourcesGuard();
+};
+
+class ITask: public NColumnShard::TMonitoringObjectsCounter<ITask> {
+private:
+ YDB_READONLY(ui32, CPUAllocation, 0);
+ YDB_READONLY(ui64, MemoryAllocation, 0);
+ YDB_READONLY_DEF(TString, Name);
+ YDB_READONLY_DEF(TString, Type);
+ YDB_ACCESSOR(ui64, Priority, 0);
+ TTaskContext Context;
+protected:
+ virtual void DoOnAllocationSuccess(const std::shared_ptr<TResourcesGuard>& guard) = 0;
+public:
+ ITask(const ui32 cpu, const ui64 memory, const TString& name, const TTaskContext& context)
+ : CPUAllocation(cpu)
+ , MemoryAllocation(memory)
+ , Name(name)
+ , Type(context.GetTypeName())
+ , Context(context)
+ {
+
+ }
+
+ const TTaskContext& GetContext() const {
+ return Context;
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() << "cpu=" << CPUAllocation << ";mem=" << MemoryAllocation << ";name=" << Name << ";type=" << Type << ";priority=" << Priority << ";";
+ }
+
+ virtual ~ITask() = default;
+ void OnAllocationSuccess(const ui64 taskId, const NActors::TActorId& senderId);
+
+ static void Start(const NActors::TActorId& actorId, const std::shared_ptr<ITask>& task);
+};
+
+}
diff --git a/ydb/core/tx/columnshard/resource_subscriber/ya.make b/ydb/core/tx/columnshard/resource_subscriber/ya.make
new file mode 100644
index 0000000000..0a71dd52f3
--- /dev/null
+++ b/ydb/core/tx/columnshard/resource_subscriber/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ actor.cpp
+ counters.cpp
+ task.cpp
+ events.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ library/cpp/actors/core
+ ydb/core/tablet_flat
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index ad4d31864a..87a239982e 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -25,7 +25,6 @@ SRCS(
columnshard_schema.cpp
counters.cpp
defs.cpp
- export_actor.cpp
read_actor.cpp
write_actor.cpp
tables_manager.cpp
@@ -53,6 +52,7 @@ PEERDIR(
ydb/core/tx/columnshard/operations
ydb/core/tx/columnshard/blobs_reader
ydb/core/tx/columnshard/blobs_action
+ ydb/core/tx/columnshard/resource_subscriber
ydb/core/tx/tiering
ydb/core/tx/conveyor/usage
ydb/core/tx/long_tx_service/public