diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-01 10:46:47 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-01 11:02:05 +0300 |
commit | 11dd57fd9a283200b4512930ce11839b8dbc0aac (patch) | |
tree | e66436f29feebf22ddc28fe1460f84739a784a40 | |
parent | 5b1a1c2bce932d8302338706e484db8f613ac416 (diff) | |
download | ydb-11dd57fd9a283200b4512930ce11839b8dbc0aac.tar.gz |
KIKIMR-19505: start task with resource broker through special actor and interface
23 files changed, 480 insertions, 128 deletions
diff --git a/.mapping.json b/.mapping.json index 12598f8590..23aacb9b91 100644 --- a/.mapping.json +++ b/.mapping.json @@ -5384,6 +5384,11 @@ "ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt":"", "ydb/core/tx/columnshard/operations/CMakeLists.txt":"", "ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt":"", + "ydb/core/tx/columnshard/resource_subscriber/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-aarch64.txt":"", + "ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-x86_64.txt":"", + "ydb/core/tx/columnshard/resource_subscriber/CMakeLists.txt":"", + "ydb/core/tx/columnshard/resource_subscriber/CMakeLists.windows-x86_64.txt":"", "ydb/core/tx/columnshard/resources/CMakeLists.darwin-x86_64.txt":"", "ydb/core/tx/columnshard/resources/CMakeLists.linux-aarch64.txt":"", "ydb/core/tx/columnshard/resources/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/tablet/resource_broker.cpp b/ydb/core/tablet/resource_broker.cpp index 1124484e95..eef35dc1a8 100644 --- a/ydb/core/tablet/resource_broker.cpp +++ b/ydb/core/tablet/resource_broker.cpp @@ -1269,6 +1269,9 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig() const ui64 KqpRmQueueCPU = 4; const ui64 KqpRmQueueMemory = 10ULL << 30; + const ui64 CSInsertCompactionMemoryLimit = 1ULL << 30; + const ui64 CSGeneralCompactionMemoryLimit = 3ULL << 30; + const ui64 TotalCPU = 20; const ui64 TotalMemory = 16ULL << 30; @@ -1305,6 +1308,18 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig() queue->MutableLimit()->SetCpu(3); queue = config.AddQueues(); + queue->SetName("QUEUE::CS::INDEXATION"); + queue->SetWeight(100); + queue->MutableLimit()->SetCpu(3); + queue->MutableLimit()->SetMemory(CSInsertCompactionMemoryLimit); + + queue = config.AddQueues(); + queue->SetName("QUEUE::CS::GENERAL"); + queue->SetWeight(100); + queue->MutableLimit()->SetCpu(3); + queue->MutableLimit()->SetMemory(CSGeneralCompactionMemoryLimit); + + queue = config.AddQueues(); queue->SetName("queue_transaction"); queue->SetWeight(100); queue->MutableLimit()->SetCpu(4); @@ -1386,6 +1401,16 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig() task->SetDefaultDuration(TDuration::Minutes(10).GetValue()); task = config.AddTasks(); + task->SetName("CS::INDEXATION"); + task->SetQueueName("QUEUE::CS::INDEXATION"); + task->SetDefaultDuration(TDuration::Minutes(10).GetValue()); + + task = config.AddTasks(); + task->SetName("CS::GENERAL"); + task->SetQueueName("QUEUE::CS::GENERAL"); + task->SetDefaultDuration(TDuration::Minutes(10).GetValue()); + + task = config.AddTasks(); task->SetName(NLocalDb::TransactionTaskName); task->SetQueueName("queue_transaction"); task->SetDefaultDuration(TDuration::Minutes(10).GetValue()); diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index d3e1e8a7d0..9f1deb1cd4 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) add_subdirectory(operations) +add_subdirectory(resource_subscriber) add_subdirectory(resources) add_subdirectory(splitter) add_subdirectory(ut_rw) @@ -55,6 +56,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-operations tx-columnshard-blobs_reader tx-columnshard-blobs_action + tx-columnshard-resource_subscriber core-tx-tiering tx-conveyor-usage tx-long_tx_service-public @@ -88,7 +90,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 5314837877..26c0b204e9 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) add_subdirectory(operations) +add_subdirectory(resource_subscriber) add_subdirectory(resources) add_subdirectory(splitter) add_subdirectory(ut_rw) @@ -56,6 +57,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-operations tx-columnshard-blobs_reader tx-columnshard-blobs_action + tx-columnshard-resource_subscriber core-tx-tiering tx-conveyor-usage tx-long_tx_service-public @@ -89,7 +91,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 5314837877..26c0b204e9 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) add_subdirectory(operations) +add_subdirectory(resource_subscriber) add_subdirectory(resources) add_subdirectory(splitter) add_subdirectory(ut_rw) @@ -56,6 +57,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-operations tx-columnshard-blobs_reader tx-columnshard-blobs_action + tx-columnshard-resource_subscriber core-tx-tiering tx-conveyor-usage tx-long_tx_service-public @@ -89,7 +91,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index 0b48bc6853..8c97dad14a 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) add_subdirectory(operations) +add_subdirectory(resource_subscriber) add_subdirectory(resources) add_subdirectory(splitter) add_subdirectory(ut_rw) @@ -56,6 +57,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-operations tx-columnshard-blobs_reader tx-columnshard-blobs_action + tx-columnshard-resource_subscriber core-tx-tiering tx-conveyor-usage tx-long_tx_service-public @@ -89,7 +91,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/tables_manager.cpp diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index ae63a10f1a..b480b6d3ed 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -29,6 +29,7 @@ struct TEvPrivate { EvWriteDraft, EvGarbageCollectionFinished, EvTieringModified, + EvStartResourceUsageTask, EvEnd }; @@ -225,6 +226,23 @@ struct TEvPrivate { } }; + TString GetBlobVerified(const TBlobRange& bRange) const { + for (auto&& i : Actions) { + for (auto&& b : i->GetBlobsForWrite()) { + if (bRange.GetBlobId() == b.first) { + AFL_VERIFY(bRange.Size + bRange.Offset <= b.second.size()); + if (bRange.Size == b.second.size()) { + return b.second; + } else { + return b.second.substr(bRange.Offset, bRange.Size); + } + } + } + } + AFL_VERIFY(false); + return ""; + } + TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, const NEvWrite::TWriteMeta& writeMeta) : PutResult(putResult) , WriteMeta(writeMeta) diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp deleted file mode 100644 index 21b2333bd0..0000000000 --- a/ydb/core/tx/columnshard/export_actor.cpp +++ /dev/null @@ -1,123 +0,0 @@ -#include "columnshard_impl.h" -#include "blob_cache.h" - -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <ydb/core/util/backoff.h> - -namespace NKikimr::NColumnShard { -namespace { - -class TExportActor : public TActorBootstrapped<TExportActor> { -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::TX_COLUMNSHARD_EXPORT_ACTOR; - } - - TExportActor(ui64 tabletId, const TActorId& parent, TAutoPtr<TEvPrivate::TEvExport> ev) - : TabletId(tabletId) - , Parent(parent) - , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) - , Event(ev.Release()) - { - Y_VERIFY(Event); - } - - void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) { - LOG_S_TRACE("TEvReadBlobRangeResult (waiting " << BlobsToRead.size() << ") at tablet " << TabletId << " (export)"); - - auto& event = *ev->Get(); - const TUnifiedBlobId& blobId = event.BlobRange.BlobId; - if (event.Status != NKikimrProto::EReplyStatus::OK) { - LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId - << " status " << NKikimrProto::EReplyStatus_Name(event.Status) - << " at tablet " << TabletId << " (export)"); - - BlobsToRead.erase(blobId); - Event->AddResult(blobId, blobId.ToStringNew(), true, - TStringBuilder() << "cannot read, status " << NKikimrProto::EReplyStatus_Name(event.Status)); - return; - } - - TString blobData = event.Data; - Y_VERIFY(blobData.size() == blobId.BlobSize()); - - if (!BlobsToRead.contains(blobId)) { - return; - } - - BlobsToRead.erase(blobId); - - Y_VERIFY(Event); - { - auto it = Event->Blobs.find(blobId); - Y_VERIFY(it != Event->Blobs.end()); - it->second = blobData; - } - - if (BlobsToRead.empty()) { - SendResultAndDie(ctx); - } - } - - void Bootstrap(const TActorContext& /*ctx*/) { - LOG_S_DEBUG("Exporting " << Event->Blobs.size() << " blobs at tablet " << TabletId); - - SendReads(); - Become(&TThis::StateWait); - } - - STFUNC(StateWait) { - switch (ev->GetTypeRewrite()) { - HFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle); - default: - break; - } - } - -private: - ui64 TabletId; - TActorId Parent; - TActorId BlobCacheActorId; - std::unique_ptr<TEvPrivate::TEvExport> Event; - THashSet<TUnifiedBlobId> BlobsToRead; - - void SendReads() { - for (auto& [blobId, _] : Event->Blobs) { - BlobsToRead.emplace(blobId); - SendReadRequest(NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize())); - } - } - - void SendReadRequest(const NBlobCache::TBlobRange& blobRange) { - Y_VERIFY(!blobRange.Offset); - Y_VERIFY(blobRange.Size); - - NBlobCache::TReadBlobRangeOptions readOpts { - .CacheAfterRead = false, - .ForceFallback = false, - .IsBackgroud = true - }; - Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts))); - } - - void SendResultAndDie(const TActorContext& ctx) { - if (Event->Status == NKikimrProto::UNKNOWN) { - auto s3Actor = Event->DstActor; - Event->DstActor = Parent; - ctx.Send(s3Actor, Event.release()); - } else { - Y_VERIFY(Event->Status == NKikimrProto::ERROR); - Event->DstActor = Parent; - ctx.Send(Parent, Event.release()); - } - Die(ctx); - } -}; - -} // namespace - -IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev) { - return new TExportActor(tabletId, dstActor, ev); -} - -} diff --git a/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..524c67a195 --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-resource_subscriber) +target_link_libraries(tx-columnshard-resource_subscriber PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(tx-columnshard-resource_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/task.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/events.cpp +) diff --git a/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..a03cb2188d --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-resource_subscriber) +target_link_libraries(tx-columnshard-resource_subscriber PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(tx-columnshard-resource_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/task.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/events.cpp +) diff --git a/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..a03cb2188d --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.linux-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-resource_subscriber) +target_link_libraries(tx-columnshard-resource_subscriber PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(tx-columnshard-resource_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/task.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/events.cpp +) diff --git a/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.txt b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..524c67a195 --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/CMakeLists.windows-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-resource_subscriber) +target_link_libraries(tx-columnshard-resource_subscriber PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(tx-columnshard-resource_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/task.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resource_subscriber/events.cpp +) diff --git a/ydb/core/tx/columnshard/resource_subscriber/actor.cpp b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp new file mode 100644 index 0000000000..b91c13d16b --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp @@ -0,0 +1,47 @@ +#include "actor.h" + +namespace NKikimr::NOlap::NResourceBroker::NSubscribe { + +class TCookie: public TThrRefBase { +private: + YDB_READONLY(ui64, TaskIdentifier, 0); +public: + TCookie(const ui64 id) + : TaskIdentifier(id) + { + + } +}; + +void TActor::Handle(TEvStartTask::TPtr& ev) { + auto task = ev->Get()->GetTask(); + Y_VERIFY(task); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ask_resources")("task", task->DebugString()); + Tasks.emplace(++Counter, task); + Send(NKikimr::NResourceBroker::MakeResourceBrokerID(), new NKikimr::NResourceBroker::TEvResourceBroker::TEvSubmitTask( + task->GetName(), + {{task->GetCPUAllocation(), task->GetMemoryAllocation()}}, + task->GetType(), + task->GetPriority(), + new TCookie(Counter) + )); + task->GetContext().GetCounters()->OnRequest(task->GetMemoryAllocation()); +} + +void TActor::Handle(NKikimr::NResourceBroker::TEvResourceBroker::TEvResourceAllocated::TPtr& ev) { + auto it = Tasks.find(((TCookie*)ev->Get()->Cookie.Get())->GetTaskIdentifier()); + Y_VERIFY(it != Tasks.end()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "result_resources")("task_id", ev->Get()->TaskId)("task", it->second->DebugString()); + it->second->OnAllocationSuccess(ev->Get()->TaskId, SelfId()); + Tasks.erase(it); + it->second->GetContext().GetCounters()->OnReply(it->second->GetMemoryAllocation()); +} + +TActor::TActor(ui64 tabletId, const TActorId& parent) + : TabletId(tabletId) + , Parent(parent) +{ + +} + +} diff --git a/ydb/core/tx/columnshard/resource_subscriber/actor.h b/ydb/core/tx/columnshard/resource_subscriber/actor.h new file mode 100644 index 0000000000..3f3d08610d --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/actor.h @@ -0,0 +1,41 @@ +#pragma once + +#include "task.h" +#include "events.h" +#include <ydb/core/tablet/resource_broker.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NOlap::NResourceBroker::NSubscribe { + +class TActor: public TActorBootstrapped<TActor> { +private: + ui64 TabletId; + NActors::TActorId Parent; + THashMap<ui64, std::shared_ptr<ITask>> Tasks; + ui64 Counter = 0; +public: + static TAtomicCounter WaitingBlobsCount; + TActor(ui64 tabletId, const TActorId& parent); + + void Handle(TEvStartTask::TPtr& ev); + void Handle(NKikimr::NResourceBroker::TEvResourceBroker::TEvResourceAllocated::TPtr& ev); + + void Bootstrap() { + Become(&TThis::StateWait); + } + + STFUNC(StateWait) { + TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent", Parent)); + switch (ev->GetTypeRewrite()) { + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway); + hFunc(TEvStartTask, Handle); + hFunc(NKikimr::NResourceBroker::TEvResourceBroker::TEvResourceAllocated, Handle); + default: + break; + } + } + + ~TActor() = default; +}; + +} diff --git a/ydb/core/tx/columnshard/resource_subscriber/counters.cpp b/ydb/core/tx/columnshard/resource_subscriber/counters.cpp new file mode 100644 index 0000000000..be0794b9f6 --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/counters.cpp @@ -0,0 +1,29 @@ +#include "counters.h" + +namespace NKikimr::NOlap::NResourceBroker::NSubscribe { + + +std::shared_ptr<TSubscriberTypeCounters> TSubscriberCounters::GetTypeCounters(const TString& resourceType) { + auto it = ResourceTypeCounters.find(resourceType); + if (it == ResourceTypeCounters.end()) { + it = ResourceTypeCounters.emplace(resourceType, std::make_shared<TSubscriberTypeCounters>(*this, resourceType)).first; + } + return it->second; +} + +TSubscriberTypeCounters::TSubscriberTypeCounters(const TSubscriberCounters& owner, const TString& resourceType) + : TBase(owner) +{ + DeepSubGroup("ResourceType", resourceType); + + RequestsCount = TBase::GetDeriviative("Requests/Count"); + RequestBytes = TBase::GetDeriviative("Requests/Bytes"); + + RepliesCount = TBase::GetDeriviative("Replies/Count"); + ReplyBytes = TBase::GetDeriviative("Replies/Bytes"); + + BytesAllocated = TBase::GetValueAutoAggregationsClient("Allocated/Bytes"); + CountAllocated = TBase::GetValueAutoAggregationsClient("Allocated/Count"); +} + +} diff --git a/ydb/core/tx/columnshard/resource_subscriber/counters.h b/ydb/core/tx/columnshard/resource_subscriber/counters.h new file mode 100644 index 0000000000..f6fafbeaca --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/counters.h @@ -0,0 +1,49 @@ +#pragma once +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <ydb/core/tx/columnshard/counters/common/owner.h> +#include <util/generic/hash.h> + +namespace NKikimr::NOlap::NResourceBroker::NSubscribe { + +class TSubscriberCounters; + +class TSubscriberTypeCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr RequestsCount; + NMonitoring::TDynamicCounters::TCounterPtr RequestBytes; + + NMonitoring::TDynamicCounters::TCounterPtr RepliesCount; + NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes; + + YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TValueAggregationClient>, BytesAllocated); + YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TValueAggregationClient>, CountAllocated); +public: + TSubscriberTypeCounters(const TSubscriberCounters& owner, const TString& resourceType); + + void OnRequest(const ui64 bytes) const { + RequestsCount->Add(1); + RequestBytes->Add(bytes); + } + + void OnReply(const ui64 bytes) const { + RepliesCount->Add(1); + ReplyBytes->Add(bytes); + } +}; + +class TSubscriberCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + THashMap<TString, std::shared_ptr<TSubscriberTypeCounters>> ResourceTypeCounters; + TMutex Mutex; +public: + TSubscriberCounters() + : TBase("ResourcesSubscriber") + { + } + + std::shared_ptr<TSubscriberTypeCounters> GetTypeCounters(const TString& resourceType); +}; + +} diff --git a/ydb/core/tx/columnshard/resource_subscriber/events.cpp b/ydb/core/tx/columnshard/resource_subscriber/events.cpp new file mode 100644 index 0000000000..2060750c10 --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NOlap::NResourceBroker::NSubscribe { + +} diff --git a/ydb/core/tx/columnshard/resource_subscriber/events.h b/ydb/core/tx/columnshard/resource_subscriber/events.h new file mode 100644 index 0000000000..f4209987d2 --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/events.h @@ -0,0 +1,23 @@ +#pragma once + +#include "task.h" +#include <ydb/core/tx/columnshard/columnshard_private_events.h> +#include <ydb/library/accessor/accessor.h> + +#include <library/cpp/actors/core/event_local.h> + +namespace NKikimr::NOlap::NResourceBroker::NSubscribe { + +class TEvStartTask: public NActors::TEventLocal<TEvStartTask, NColumnShard::TEvPrivate::EEv::EvStartResourceUsageTask> { +private: + YDB_READONLY_DEF(std::shared_ptr<ITask>, Task); +public: + + explicit TEvStartTask(std::shared_ptr<ITask> task) + : Task(task) { + } + +}; + + +} diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.cpp b/ydb/core/tx/columnshard/resource_subscriber/task.cpp new file mode 100644 index 0000000000..8dc1fdc646 --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/task.cpp @@ -0,0 +1,34 @@ +#include "task.h" +#include "events.h" +#include <ydb/core/tablet/resource_broker.h> +#include <library/cpp/actors/core/log.h> + +namespace NKikimr::NOlap::NResourceBroker::NSubscribe { + +void ITask::OnAllocationSuccess(const ui64 taskId, const NActors::TActorId& senderId) { + DoOnAllocationSuccess(std::make_shared<TResourcesGuard>(taskId, *this, senderId, Context)); +} + +void ITask::Start(const NActors::TActorId& actorId, const std::shared_ptr<ITask>& task) { + NActors::TActorContext::AsActorContext().Send(actorId, std::make_unique<TEvStartTask>(task)); +} + +TResourcesGuard::~TResourcesGuard() { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "free_resources")("task_id", TaskId)("mem", Memory)("cpu", Cpu); + auto ev = std::make_unique<IEventHandle>(NKikimr::NResourceBroker::MakeResourceBrokerID(), Sender, new NKikimr::NResourceBroker::TEvResourceBroker::TEvFinishTask(TaskId)); + NActors::TActorContext::AsActorContext().Send(std::move(ev)); + Context.GetCounters()->GetBytesAllocated()->Remove(Memory); +} + +TResourcesGuard::TResourcesGuard(const ui64 taskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context) + : TaskId(taskId) + , Sender(sender) + , Memory(task.GetMemoryAllocation()) + , Cpu(task.GetCPUAllocation()) + , Context(context) +{ + Context.GetCounters()->GetBytesAllocated()->Add(Memory); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "allocate_resources")("task_id", TaskId)("mem", Memory)("cpu", Cpu); +} + +} diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.h b/ydb/core/tx/columnshard/resource_subscriber/task.h new file mode 100644 index 0000000000..39ef7356f9 --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/task.h @@ -0,0 +1,68 @@ +#pragma once +#include <ydb/core/tx/columnshard/counters/common/object_counter.h> +#include "counters.h" + +namespace NKikimr::NOlap::NResourceBroker::NSubscribe { + +class ITask; + +class TTaskContext { +private: + YDB_READONLY_DEF(std::shared_ptr<TSubscriberTypeCounters>, Counters); + YDB_READONLY_DEF(TString, TypeName); +public: + TTaskContext(const TString& typeName, const std::shared_ptr<TSubscriberCounters>& subscriberCounters) + : TypeName(typeName) + { + Counters = subscriberCounters->GetTypeCounters(TypeName); + } +}; + +class TResourcesGuard: public NColumnShard::TMonitoringObjectsCounter<TResourcesGuard> { +private: + const ui64 TaskId; + const NActors::TActorId Sender; + const ui64 Memory; + const ui32 Cpu; + const TTaskContext Context; +public: + TResourcesGuard(const ui64 taskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context); + ~TResourcesGuard(); +}; + +class ITask: public NColumnShard::TMonitoringObjectsCounter<ITask> { +private: + YDB_READONLY(ui32, CPUAllocation, 0); + YDB_READONLY(ui64, MemoryAllocation, 0); + YDB_READONLY_DEF(TString, Name); + YDB_READONLY_DEF(TString, Type); + YDB_ACCESSOR(ui64, Priority, 0); + TTaskContext Context; +protected: + virtual void DoOnAllocationSuccess(const std::shared_ptr<TResourcesGuard>& guard) = 0; +public: + ITask(const ui32 cpu, const ui64 memory, const TString& name, const TTaskContext& context) + : CPUAllocation(cpu) + , MemoryAllocation(memory) + , Name(name) + , Type(context.GetTypeName()) + , Context(context) + { + + } + + const TTaskContext& GetContext() const { + return Context; + } + + TString DebugString() const { + return TStringBuilder() << "cpu=" << CPUAllocation << ";mem=" << MemoryAllocation << ";name=" << Name << ";type=" << Type << ";priority=" << Priority << ";"; + } + + virtual ~ITask() = default; + void OnAllocationSuccess(const ui64 taskId, const NActors::TActorId& senderId); + + static void Start(const NActors::TActorId& actorId, const std::shared_ptr<ITask>& task); +}; + +} diff --git a/ydb/core/tx/columnshard/resource_subscriber/ya.make b/ydb/core/tx/columnshard/resource_subscriber/ya.make new file mode 100644 index 0000000000..0a71dd52f3 --- /dev/null +++ b/ydb/core/tx/columnshard/resource_subscriber/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + actor.cpp + counters.cpp + task.cpp + events.cpp +) + +PEERDIR( + ydb/core/protos + library/cpp/actors/core + ydb/core/tablet_flat +) + +END() diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index ad4d31864a..87a239982e 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -25,7 +25,6 @@ SRCS( columnshard_schema.cpp counters.cpp defs.cpp - export_actor.cpp read_actor.cpp write_actor.cpp tables_manager.cpp @@ -53,6 +52,7 @@ PEERDIR( ydb/core/tx/columnshard/operations ydb/core/tx/columnshard/blobs_reader ydb/core/tx/columnshard/blobs_action + ydb/core/tx/columnshard/resource_subscriber ydb/core/tx/tiering ydb/core/tx/conveyor/usage ydb/core/tx/long_tx_service/public |