aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-05-25 10:42:32 +0300
committerGitHub <noreply@github.com>2025-05-25 10:42:32 +0300
commitd617cc14c32e50f51615fe1d3a4c783289b10641 (patch)
tree855cc9a002c364649091f1f615c586b95892c22f
parente727aaf2a0bedfa37ddb8205a1f5d922b501eab0 (diff)
downloadydb-main.tar.gz
one conveyor for insert and compaction. draft. (#18776)HEADmain
-rw-r--r--ydb/core/base/events.h1
-rw-r--r--ydb/core/protos/config.proto25
-rw-r--r--ydb/core/tx/columnshard/engines/portions/data_accessor.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp2
-rw-r--r--ydb/core/tx/conveyor/usage/abstract.h8
-rw-r--r--ydb/core/tx/conveyor/usage/service.h24
-rw-r--r--ydb/core/tx/conveyor/ya.make11
-rw-r--r--ydb/core/tx/conveyor_composite/service/category.cpp81
-rw-r--r--ydb/core/tx/conveyor_composite/service/category.h50
-rw-r--r--ydb/core/tx/conveyor_composite/service/common.cpp37
-rw-r--r--ydb/core/tx/conveyor_composite/service/common.h127
-rw-r--r--ydb/core/tx/conveyor_composite/service/counters.cpp31
-rw-r--r--ydb/core/tx/conveyor_composite/service/counters.h79
-rw-r--r--ydb/core/tx/conveyor_composite/service/events.cpp23
-rw-r--r--ydb/core/tx/conveyor_composite/service/events.h117
-rw-r--r--ydb/core/tx/conveyor_composite/service/manager.cpp4
-rw-r--r--ydb/core/tx/conveyor_composite/service/manager.h78
-rw-r--r--ydb/core/tx/conveyor_composite/service/process.cpp5
-rw-r--r--ydb/core/tx/conveyor_composite/service/process.h122
-rw-r--r--ydb/core/tx/conveyor_composite/service/scope.cpp38
-rw-r--r--ydb/core/tx/conveyor_composite/service/scope.h71
-rw-r--r--ydb/core/tx/conveyor_composite/service/service.cpp69
-rw-r--r--ydb/core/tx/conveyor_composite/service/service.h67
-rw-r--r--ydb/core/tx/conveyor_composite/service/worker.cpp64
-rw-r--r--ydb/core/tx/conveyor_composite/service/worker.h69
-rw-r--r--ydb/core/tx/conveyor_composite/service/workers_pool.cpp85
-rw-r--r--ydb/core/tx/conveyor_composite/service/workers_pool.h100
-rw-r--r--ydb/core/tx/conveyor_composite/service/ya.make21
-rw-r--r--ydb/core/tx/conveyor_composite/usage/common.cpp18
-rw-r--r--ydb/core/tx/conveyor_composite/usage/common.h44
-rw-r--r--ydb/core/tx/conveyor_composite/usage/config.cpp206
-rw-r--r--ydb/core/tx/conveyor_composite/usage/config.h138
-rw-r--r--ydb/core/tx/conveyor_composite/usage/events.cpp15
-rw-r--r--ydb/core/tx/conveyor_composite/usage/events.h69
-rw-r--r--ydb/core/tx/conveyor_composite/usage/service.cpp5
-rw-r--r--ydb/core/tx/conveyor_composite/usage/service.h83
-rw-r--r--ydb/core/tx/conveyor_composite/usage/ya.make17
-rw-r--r--ydb/core/tx/conveyor_composite/ut/ut_simple.cpp425
-rw-r--r--ydb/core/tx/conveyor_composite/ut/ya.make30
-rw-r--r--ydb/core/tx/conveyor_composite/ya.make15
-rw-r--r--ydb/core/tx/ya.make2
41 files changed, 2468 insertions, 9 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index 4e652a4f322..eb9010f205d 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -185,6 +185,7 @@ struct TKikimrEvents : TEvents {
ES_FEATURE_FLAGS = 4262,
ES_PRIORITY_QUEUE = 4263,
ES_SOLOMON_PROVIDER = 4264,
+ ES_CONVEYOR_COMPOSITE = 4265,
};
};
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 2baeb7f4f11..8630b12e5eb 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -654,6 +654,28 @@ message TConveyorConfig {
optional double WorkersCountDouble = 5;
}
+message TCompositeConveyorConfig {
+ message TCategory {
+ optional string Name = 1;
+ optional uint64 QueueSizeLimit = 2;
+ }
+
+ message TWorkerPoolCategoryLink {
+ optional string Category = 1;
+ optional double Weight = 2;
+ }
+
+ message TWorkersPool {
+ optional double WorkersCount = 1;
+ optional double DefaultFractionOfThreadsCount = 2;
+ repeated TWorkerPoolCategoryLink Links = 3;
+ }
+
+ optional bool Enabled = 1 [default = true];
+ repeated TWorkersPool WorkerPools = 2;
+ repeated TCategory Categories = 3;
+}
+
message TPrioritiesQueueConfig {
optional bool Enabled = 1 [default = true];
optional uint32 Limit = 2 [default = 32];
@@ -1116,7 +1138,7 @@ message TQueryServiceConfig {
optional uint32 QueryTimeoutDefaultSeconds = 19 [default = 1800];
optional bool EnableMatchRecognize = 20 [default = false];
repeated string AvailableExternalDataSources = 22; // Ignored if AllExternalDataSourcesAreAvailable is true
- optional bool AllExternalDataSourcesAreAvailable = 23 [default = true];
+ optional bool AllExternalDataSourcesAreAvailable = 23 [default = true];
}
// Config describes immediate controls and allows
@@ -2375,6 +2397,7 @@ message TAppConfig {
optional NKikimrBlobStorage.TYamlConfig StoredConfigYaml = 105;
optional string StartupConfigYaml = 107;
optional string StartupStorageYaml = 108;
+ optional TCompositeConveyorConfig CompositeConveyorConfig = 109;
}
message TYdbVersion {
diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
index 07d4e6fe709..c58ae15bed8 100644
--- a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
@@ -651,6 +651,7 @@ void TPortionDataAccessor::FullValidation() const {
TBlobRange::Validate(PortionInfo->GetMeta().GetBlobIds(), i.GetBlobRange()).Validate();
blobIdxs.emplace(i.GetBlobRange().GetBlobIdxVerified());
}
+ AFL_VERIFY(GetRecordsVerified().size());
for (auto&& i : GetIndexesVerified()) {
if (auto bRange = i.GetBlobRangeOptional()) {
TBlobRange::Validate(PortionInfo->GetMeta().GetBlobIds(), *bRange).Validate();
diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp b/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp
index dfa57b5bee8..329efccbd77 100644
--- a/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp
@@ -77,4 +77,4 @@ void TWriteAggregation::Flush(const ui64 tabletId) {
}
}
-} // namespace NKikimr::NColumnShard::NWritingPortions
+} // namespace NKikimr::NOlap::NWritingPortions
diff --git a/ydb/core/tx/conveyor/usage/abstract.h b/ydb/core/tx/conveyor/usage/abstract.h
index 909136d4263..c9fa9da33bc 100644
--- a/ydb/core/tx/conveyor/usage/abstract.h
+++ b/ydb/core/tx/conveyor/usage/abstract.h
@@ -19,6 +19,14 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr Success;
NMonitoring::TDynamicCounters::TCounterPtr SuccessDuration;
+ TTaskSignals(const NColumnShard::TCommonCountersOwner& baseObject, const TString& taskClassIdentifier)
+ : TBase(baseObject, "task_class_name", taskClassIdentifier) {
+ Fails = TBase::GetDeriviative("Fails");
+ FailsDuration = TBase::GetDeriviative("FailsDuration");
+ Success = TBase::GetDeriviative("Success");
+ SuccessDuration = TBase::GetDeriviative("SuccessDuration");
+ }
+
TTaskSignals(const TString& moduleId, const TString& taskClassIdentifier, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals = nullptr)
: TBase(moduleId, baseSignals) {
DeepSubGroup("task_class", taskClassIdentifier);
diff --git a/ydb/core/tx/conveyor/usage/service.h b/ydb/core/tx/conveyor/usage/service.h
index 2acd2ede377..a943ec29b62 100644
--- a/ydb/core/tx/conveyor/usage/service.h
+++ b/ydb/core/tx/conveyor/usage/service.h
@@ -1,20 +1,21 @@
#pragma once
#include "config.h"
-#include <ydb/library/actors/core/actorid.h>
-#include <ydb/library/actors/core/actor.h>
+
#include <ydb/core/tx/conveyor/service/service.h>
#include <ydb/core/tx/conveyor/usage/events.h>
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/actorid.h>
+
namespace NKikimr::NConveyor {
class TAsyncTaskExecutor: public TActorBootstrapped<TAsyncTaskExecutor> {
private:
const std::shared_ptr<ITask> Task;
+
public:
TAsyncTaskExecutor(const std::shared_ptr<ITask>& task)
- : Task(task)
- {
-
+ : Task(task) {
}
void Bootstrap() {
@@ -23,6 +24,12 @@ public:
}
};
+enum class ESpecialTaskProcesses {
+ Insert = 1,
+ Compaction = 2,
+ Normalizer = 3
+};
+
template <class TConveyorPolicy>
class TServiceOperatorImpl {
private:
@@ -35,11 +42,15 @@ private:
Y_ABORT_UNLESS(TConveyorPolicy::Name.size() == 4);
return TConveyorPolicy::Name;
}
+
public:
static void AsyncTaskToExecute(const std::shared_ptr<ITask>& task) {
auto& context = NActors::TActorContext::AsActorContext();
context.Register(new TAsyncTaskExecutor(task));
}
+ static bool SendTaskToExecute(const std::shared_ptr<ITask>& task, const ESpecialTaskProcesses processType) {
+ return SendTaskToExecute(task, (ui64)processType);
+ }
static bool SendTaskToExecute(const std::shared_ptr<ITask>& task, const ui64 processId = 0) {
if (TSelf::IsEnabled() && NActors::TlsActivationContext) {
auto& context = NActors::TActorContext::AsActorContext();
@@ -71,7 +82,6 @@ public:
return TProcessGuard(externalProcessId, {});
}
}
-
};
class TScanConveyorPolicy {
@@ -96,4 +106,4 @@ using TScanServiceOperator = TServiceOperatorImpl<TScanConveyorPolicy>;
using TCompServiceOperator = TServiceOperatorImpl<TCompConveyorPolicy>;
using TInsertServiceOperator = TServiceOperatorImpl<TInsertConveyorPolicy>;
-}
+} // namespace NKikimr::NConveyor
diff --git a/ydb/core/tx/conveyor/ya.make b/ydb/core/tx/conveyor/ya.make
new file mode 100644
index 00000000000..e5c415537d4
--- /dev/null
+++ b/ydb/core/tx/conveyor/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+SRCS(
+)
+
+PEERDIR(
+ ydb/core/tx/conveyor/service
+ ydb/core/tx/conveyor/usage
+)
+
+END()
diff --git a/ydb/core/tx/conveyor_composite/service/category.cpp b/ydb/core/tx/conveyor_composite/service/category.cpp
new file mode 100644
index 00000000000..4c87ae85d0d
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/category.cpp
@@ -0,0 +1,81 @@
+#include "category.h"
+
+namespace NKikimr::NConveyorComposite {
+
+bool TProcessCategory::HasTasks() const {
+ for (auto&& i : Scopes) {
+ if (i.second->HasTasks()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+void TProcessCategory::DoQuant(const TMonotonic newStart) {
+ CPUUsage->Cut(newStart);
+ for (auto&& i : Scopes) {
+ i.second->DoQuant(newStart);
+ }
+}
+
+TWorkerTask TProcessCategory::ExtractTaskWithPrediction() {
+ std::shared_ptr<TProcessScope> scopeMin;
+ TDuration dMin;
+ for (auto&& [_, scope] : Scopes) {
+ if (!scope->HasTasks()) {
+ continue;
+ }
+ const TDuration d = scope->GetCPUUsage()->CalcWeight(scope->GetWeight());
+ if (!scopeMin || d < dMin) {
+ dMin = d;
+ scopeMin = scope;
+ }
+ }
+ AFL_VERIFY(scopeMin);
+ return scopeMin->ExtractTaskWithPrediction();
+}
+
+TProcessScope& TProcessCategory::MutableProcessScope(const TString& scopeName) {
+ auto it = Scopes.find(scopeName);
+ AFL_VERIFY(it != Scopes.end())("cat", GetCategory())("scope", scopeName);
+ return *it->second;
+}
+
+TProcessScope* TProcessCategory::MutableProcessScopeOptional(const TString& scopeName) {
+ auto it = Scopes.find(scopeName);
+ if (it != Scopes.end()) {
+ return it->second.get();
+ } else {
+ return nullptr;
+ }
+}
+
+TProcessScope& TProcessCategory::RegisterScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits) {
+ TCPUGroup::TPtr cpuGroup = std::make_shared<TCPUGroup>(processCpuLimits.GetCPUGroupThreadsLimitDef(256));
+ auto info = Scopes.emplace(scopeId, std::make_shared<TProcessScope>(std::move(cpuGroup), CPUUsage));
+ AFL_VERIFY(info.second);
+ return *info.first->second;
+}
+
+TProcessScope& TProcessCategory::UpdateScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits) {
+ auto& scope = MutableProcessScope(scopeId);
+ scope.UpdateLimits(processCpuLimits);
+ return scope;
+}
+
+TProcessScope& TProcessCategory::UpsertScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits) {
+ if (Scopes.contains(scopeId)) {
+ return UpdateScope(scopeId, processCpuLimits);
+ } else {
+ return RegisterScope(scopeId, processCpuLimits);
+ }
+}
+
+void TProcessCategory::UnregisterScope(const TString& name) {
+ auto it = Scopes.find(name);
+ AFL_VERIFY(it != Scopes.end());
+ AFL_VERIFY(!it->second->GetProcessesCount());
+ Scopes.erase(it);
+}
+
+}
diff --git a/ydb/core/tx/conveyor_composite/service/category.h b/ydb/core/tx/conveyor_composite/service/category.h
new file mode 100644
index 00000000000..78c8f9b6b61
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/category.h
@@ -0,0 +1,50 @@
+#pragma once
+#include "common.h"
+#include "counters.h"
+#include "scope.h"
+#include "worker.h"
+
+#include <ydb/library/accessor/positive_integer.h>
+
+namespace NKikimr::NConveyorComposite {
+
+class TProcessCategory: public TNonCopyable {
+private:
+ const ESpecialTaskCategory Category;
+ std::shared_ptr<TCPUUsage> CPUUsage = std::make_shared<TCPUUsage>(nullptr);
+ TPositiveControlInteger WaitingTasksCount;
+ YDB_READONLY_DEF(std::shared_ptr<TCategorySignals>, Counters);
+ THashMap<TString, std::shared_ptr<TProcessScope>> Scopes;
+ const NConfig::TCategory Config;
+
+public:
+ TProcessCategory(const NConfig::TCategory& config, TCounters& counters)
+ : Category(config.GetCategory())
+ , Config(config) {
+ Counters = counters.GetCategorySignals(Category);
+ }
+
+ ESpecialTaskCategory GetCategory() const {
+ return Category;
+ }
+
+ void PutTaskResult(TWorkerTaskResult&& result) {
+ const TString id = result.GetScopeId();
+ if (TProcessScope* scope = MutableProcessScopeOptional(id)) {
+ scope->PutTaskResult(std::move(result));
+ }
+ }
+
+ bool HasTasks() const;
+ void DoQuant(const TMonotonic newStart);
+ TWorkerTask ExtractTaskWithPrediction();
+ TProcessScope& MutableProcessScope(const TString& scopeName);
+ TProcessScope* MutableProcessScopeOptional(const TString& scopeName);
+ TProcessScope& RegisterScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits);
+ TProcessScope& UpsertScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits);
+
+ TProcessScope& UpdateScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits);
+ void UnregisterScope(const TString& name);
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/common.cpp b/ydb/core/tx/conveyor_composite/service/common.cpp
new file mode 100644
index 00000000000..46a9129dc63
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/common.cpp
@@ -0,0 +1,37 @@
+#include "common.h"
+
+#include <ydb/library/actors/core/log.h>
+
+namespace NKikimr::NConveyorComposite {
+
+void TCPUUsage::Exchange(const TDuration predicted, const TMonotonic start, const TMonotonic finish) {
+ Usage.emplace_back(TTaskCPUUsage(start, finish));
+ AFL_VERIFY(predicted <= PredictedDuration)("predicted_delta", predicted)("predicted_sum", PredictedDuration);
+ Duration += Usage.back().GetDuration();
+ PredictedDuration -= predicted;
+ if (Parent) {
+ Parent->Exchange(predicted, start, finish);
+ }
+}
+
+void TCPUUsage::Cut(const TMonotonic start) {
+ ui32 idx = 0;
+ while (idx < Usage.size()) {
+ AFL_VERIFY(Usage[idx].GetDuration() <= Duration);
+ Duration -= Usage[idx].GetDuration();
+ if (Usage[idx].GetFinish() <= start) {
+ std::swap(Usage[idx], Usage.back());
+ Usage.pop_back();
+ } else {
+ Usage[idx].Cut(start);
+ Duration += Usage[idx].GetDuration();
+ ++idx;
+ }
+ }
+}
+
+TCPUGroup::~TCPUGroup() {
+ AFL_VERIFY(ProcessesCount == 0);
+}
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/common.h b/ydb/core/tx/conveyor_composite/service/common.h
new file mode 100644
index 00000000000..b35e49a17f4
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/common.h
@@ -0,0 +1,127 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/accessor/positive_integer.h>
+#include <ydb/library/actors/core/monotonic.h>
+#include <ydb/library/signals/owner.h>
+
+#include <util/datetime/base.h>
+
+namespace NKikimr::NConveyorComposite {
+
+class TTaskCPUUsage {
+private:
+ YDB_READONLY_DEF(TMonotonic, Start);
+ YDB_READONLY_DEF(TMonotonic, Finish);
+ YDB_READONLY_DEF(TDuration, Duration);
+
+public:
+ void Cut(const TMonotonic start) {
+ AFL_VERIFY(start < Finish);
+ if (Start <= start) {
+ Start = start;
+ }
+ }
+
+ TTaskCPUUsage(const TMonotonic start, const TMonotonic finish)
+ : Start(start)
+ , Finish(finish)
+ , Duration(finish - start) {
+ }
+};
+
+template <class T>
+class TAverageCalcer {
+private:
+ const ui32 Count = 100;
+ std::deque<T> Values;
+ T Sum = T();
+
+public:
+ TAverageCalcer(const ui32 count = 100)
+ : Count(count) {
+ }
+
+ void Add(const T value) {
+ Values.emplace_back(value);
+ Sum += value;
+ if (Values.size() > Count) {
+ Sum -= Values.front();
+ Values.pop_front();
+ }
+ }
+
+ T GetValue() const {
+ return Values.size() ? (Sum / Values.size()) : T();
+ }
+};
+
+class TCPUUsage {
+private:
+ std::deque<TTaskCPUUsage> Usage;
+ YDB_READONLY_DEF(TDuration, Duration);
+ YDB_READONLY_DEF(TDuration, PredictedDuration);
+ std::shared_ptr<TCPUUsage> Parent;
+
+public:
+ TCPUUsage(const std::shared_ptr<TCPUUsage>& parent)
+ : Parent(parent) {
+ }
+
+ TDuration CalcWeight(const double w) const {
+ if (w <= 0) {
+ return TDuration::Max();
+ } else {
+ return (Duration + PredictedDuration) * w;
+ }
+ }
+
+ void AddPredicted(const TDuration d) {
+ PredictedDuration += d;
+ if (Parent) {
+ Parent->AddPredicted(d);
+ }
+ }
+
+ void AddUsage(const TTaskCPUUsage& usage) {
+ Usage.emplace_back(usage);
+ Duration += usage.GetDuration();
+ if (Parent) {
+ Parent->AddUsage(usage);
+ }
+ }
+
+ void Exchange(const TDuration predicted, const TMonotonic start, const TMonotonic finish);
+ void Cut(const TMonotonic start);
+};
+
+class TCPUGroup {
+ YDB_ACCESSOR_DEF(double, CPUThreadsLimit);
+ YDB_ACCESSOR(double, Weight, 1);
+ TPositiveControlInteger ProcessesCount;
+
+public:
+ using TPtr = std::shared_ptr<TCPUGroup>;
+
+ TCPUGroup(const double cpuThreadsLimit, const double weight = 1)
+ : CPUThreadsLimit(cpuThreadsLimit)
+ , Weight(weight)
+ {
+ }
+
+ ~TCPUGroup();
+
+ ui32 GetProcessesCount() const {
+ return ProcessesCount.Val();
+ }
+
+ bool DecProcesses() {
+ --ProcessesCount;
+ return ProcessesCount == 0;
+ }
+
+ void IncProcesses() {
+ ++ProcessesCount;
+ }
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/counters.cpp b/ydb/core/tx/conveyor_composite/service/counters.cpp
new file mode 100644
index 00000000000..59e2f702088
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/counters.cpp
@@ -0,0 +1,31 @@
+#include "counters.h"
+
+namespace NKikimr::NConveyorComposite {
+
+TCounters::TCounters(const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals)
+ : TBase("CompositeConveyor/" + conveyorName, baseSignals)
+ , ProcessesCount(TBase::GetValue("Processes/Count"))
+ , WaitingQueueSize(TBase::GetValue("WaitingQueueSize"))
+ , WaitingQueueSizeLimit(TBase::GetValue("WaitingQueueSizeLimit"))
+ , AvailableWorkersCount(TBase::GetValue("AvailableWorkersCount"))
+ , WorkersCountLimit(TBase::GetValue("WorkersCountLimit"))
+ , AmountCPULimit(TBase::GetValue("AmountCPULimit"))
+ , IncomingRate(TBase::GetDeriviative("Incoming"))
+ , SolutionsRate(TBase::GetDeriviative("Solved"))
+ , OverlimitRate(TBase::GetDeriviative("Overlimit"))
+ , WaitWorkerRate(TBase::GetDeriviative("WaitWorker"))
+ , UseWorkerRate(TBase::GetDeriviative("UseWorker"))
+ , ChangeCPULimitRate(TBase::GetDeriviative("ChangeCPULimit"))
+ , WaitingHistogram(TBase::GetHistogram("Waiting/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50)))
+ , PackHistogram(TBase::GetHistogram("ExecutionPack/Count", NMonitoring::LinearHistogram(25, 1, 1)))
+ , PackExecuteHistogram(TBase::GetHistogram("PackExecute/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50)))
+ , TaskExecuteHistogram(TBase::GetHistogram("TaskExecute/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50)))
+ , SendBackHistogram(TBase::GetHistogram("SendBack/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50)))
+ , SendFwdHistogram(TBase::GetHistogram("SendForward/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50)))
+ , ReceiveTaskHistogram(TBase::GetHistogram("ReceiveTask/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50)))
+ , SendBackDuration(TBase::GetDeriviative("SendBack/Duration/Us"))
+ , SendFwdDuration(TBase::GetDeriviative("SendForward/Duration/Us"))
+ , ExecuteDuration(TBase::GetDeriviative("Execute/Duration/Us")) {
+}
+
+}
diff --git a/ydb/core/tx/conveyor_composite/service/counters.h b/ydb/core/tx/conveyor_composite/service/counters.h
new file mode 100644
index 00000000000..e6b7f38fc8f
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/counters.h
@@ -0,0 +1,79 @@
+#pragma once
+#include <ydb/core/tx/conveyor/usage/abstract.h>
+#include <ydb/core/tx/conveyor_composite/usage/common.h>
+
+#include <ydb/library/signals/owner.h>
+
+namespace NKikimr::NConveyorComposite {
+
+using TTaskSignals = NConveyor::TTaskSignals;
+
+class TCategorySignals: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ THashMap<TString, std::shared_ptr<TTaskSignals>> TaskClassSignals;
+ YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert);
+
+public:
+ TCategorySignals(NColumnShard::TCommonCountersOwner& base, const ESpecialTaskCategory cat)
+ : TBase(base, "category", ::ToString(cat))
+ , Category(cat) {
+ }
+
+ std::shared_ptr<TTaskSignals> GetTaskSignals(const TString& taskClassName) {
+ auto it = TaskClassSignals.find(taskClassName);
+ if (it == TaskClassSignals.end()) {
+ it = TaskClassSignals.emplace(taskClassName, std::make_shared<TTaskSignals>(*this, taskClassName)).first;
+ }
+ return it->second;
+ }
+};
+
+class TCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ THashMap<ESpecialTaskCategory, std::shared_ptr<TCategorySignals>> CategorySignals;
+
+public:
+ const ::NMonitoring::TDynamicCounters::TCounterPtr ProcessesCount;
+
+ const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSize;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSizeLimit;
+
+ const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressSize;
+
+ const ::NMonitoring::TDynamicCounters::TCounterPtr AvailableWorkersCount;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCountLimit;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr AmountCPULimit;
+
+ const ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRate;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr SolutionsRate;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr OverlimitRate;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr WaitWorkerRate;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr UseWorkerRate;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr ChangeCPULimitRate;
+
+ const ::NMonitoring::THistogramPtr WaitingHistogram;
+ const ::NMonitoring::THistogramPtr PackHistogram;
+ const ::NMonitoring::THistogramPtr PackExecuteHistogram;
+ const ::NMonitoring::THistogramPtr TaskExecuteHistogram;
+ const ::NMonitoring::THistogramPtr SendBackHistogram;
+ const ::NMonitoring::THistogramPtr SendFwdHistogram;
+ const ::NMonitoring::THistogramPtr ReceiveTaskHistogram;
+
+ const ::NMonitoring::TDynamicCounters::TCounterPtr SendBackDuration;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr SendFwdDuration;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr ExecuteDuration;
+
+ std::shared_ptr<TCategorySignals> GetCategorySignals(const ESpecialTaskCategory cat) {
+ auto it = CategorySignals.find(cat);
+ if (it == CategorySignals.end()) {
+ it = CategorySignals.emplace(cat, std::make_shared<TCategorySignals>(*this, cat)).first;
+ }
+ return it->second;
+ }
+
+ TCounters(const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals);
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/events.cpp b/ydb/core/tx/conveyor_composite/service/events.cpp
new file mode 100644
index 00000000000..68c63c027ca
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/events.cpp
@@ -0,0 +1,23 @@
+#include "events.h"
+
+#include <ydb/library/actors/core/log.h>
+
+namespace NKikimr::NConveyorComposite {
+
+TEvInternal::TEvTaskProcessedResult::TEvTaskProcessedResult(
+ std::vector<TWorkerTaskResult>&& results, const TDuration forwardSendDuration, const ui64 workerIdx, const ui64 workersPoolId)
+ : ForwardSendDuration(forwardSendDuration)
+ , Results(std::move(results))
+ , WorkerIdx(workerIdx)
+ , WorkersPoolId(workersPoolId) {
+ AFL_VERIFY(Results.size());
+}
+
+TWorkerTaskResult::TWorkerTaskResult(const TWorkerTaskContext& context, const TMonotonic start, const TMonotonic finish)
+ : TBase(context)
+ , Start(start)
+ , Finish(finish) {
+ AFL_VERIFY(Start <= Finish);
+}
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/events.h b/ydb/core/tx/conveyor_composite/service/events.h
new file mode 100644
index 00000000000..dcafb80c675
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/events.h
@@ -0,0 +1,117 @@
+#pragma once
+#include "counters.h"
+
+#include <ydb/core/tx/conveyor_composite/usage/common.h>
+
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/actors/core/events.h>
+#include <ydb/library/actors/core/monotonic.h>
+#include <ydb/library/conclusion/result.h>
+
+namespace NKikimr::NConveyorComposite {
+
+class TWorkerTaskContext {
+private:
+ YDB_READONLY(TMonotonic, CreateInstant, TMonotonic::Now());
+ YDB_READONLY_DEF(TDuration, PredictedDuration);
+ YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert);
+ YDB_READONLY_DEF(TString, ScopeId);
+ YDB_READONLY(ui64, ProcessId, 0);
+
+public:
+ TWorkerTaskContext(const TDuration prediction, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId)
+ : PredictedDuration(prediction)
+ , Category(category)
+ , ScopeId(scopeId)
+ , ProcessId(processId) {
+ }
+};
+
+class TWorkerTask;
+
+class TWorkerTaskResult: public TWorkerTaskContext {
+private:
+ using TBase = TWorkerTaskContext;
+ YDB_READONLY_DEF(TMonotonic, Start);
+ YDB_READONLY_DEF(TMonotonic, Finish);
+
+ TWorkerTaskResult(const TWorkerTaskContext& context, const TMonotonic start, const TMonotonic finish);
+ friend class TWorkerTask;
+
+public:
+ TDuration GetDuration() const {
+ return Finish - Start;
+ }
+
+};
+
+class TWorkerTask: public TWorkerTaskContext {
+private:
+ using TBase = TWorkerTaskContext;
+ YDB_READONLY_DEF(ITask::TPtr, Task);
+ YDB_READONLY_DEF(std::shared_ptr<TTaskSignals>, TaskSignals);
+
+public:
+ TWorkerTaskResult GetResult(const TMonotonic start, const TMonotonic finish) const {
+ return TWorkerTaskResult(*this, start, finish);
+ }
+
+ TWorkerTask(const ITask::TPtr& task, const TDuration prediction, const ESpecialTaskCategory category, const TString& scopeId,
+ const std::shared_ptr<TTaskSignals>& taskSignals, const ui64 processId)
+ : TBase(prediction, category, scopeId, processId)
+ , Task(task)
+ , TaskSignals(taskSignals) {
+ Y_ABORT_UNLESS(task);
+ }
+
+ bool operator<(const TWorkerTask& wTask) const {
+ return Task->GetPriority() < wTask.Task->GetPriority();
+ }
+};
+
+struct TEvInternal {
+ enum EEv {
+ EvNewTask = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvTaskProcessedResult,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expected EvEnd < EventSpaceEnd");
+
+ class TEvNewTask: public NActors::TEventLocal<TEvNewTask, EvNewTask> {
+ private:
+ std::vector<TWorkerTask> Tasks;
+ YDB_READONLY(TMonotonic, ConstructInstant, TMonotonic::Now());
+
+ public:
+ TEvNewTask() = default;
+
+ std::vector<TWorkerTask>&& ExtractTasks() {
+ return std::move(Tasks);
+ }
+
+ explicit TEvNewTask(std::vector<TWorkerTask>&& tasks)
+ : Tasks(std::move(tasks)) {
+ }
+ };
+
+ class TEvTaskProcessedResult: public NActors::TEventLocal<TEvTaskProcessedResult, EvTaskProcessedResult> {
+ private:
+ using TBase = TConclusion<ITask::TPtr>;
+ YDB_READONLY_DEF(TDuration, ForwardSendDuration);
+ std::vector<TWorkerTaskResult> Results;
+ YDB_READONLY(TMonotonic, ConstructInstant, TMonotonic::Now());
+ YDB_READONLY(ui64, WorkerIdx, 0);
+ YDB_READONLY(ui64, WorkersPoolId, 0);
+
+ public:
+ std::vector<TWorkerTaskResult>&& DetachResults() {
+ return std::move(Results);
+ }
+
+ TEvTaskProcessedResult(
+ std::vector<TWorkerTaskResult>&& results, const TDuration forwardSendDuration, const ui64 workerIdx, const ui64 workersPoolId);
+ };
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/manager.cpp b/ydb/core/tx/conveyor_composite/service/manager.cpp
new file mode 100644
index 00000000000..d62b06ae24a
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/manager.cpp
@@ -0,0 +1,4 @@
+#include "category.h"
+
+namespace NKikimr::NConveyorComposite {
+}
diff --git a/ydb/core/tx/conveyor_composite/service/manager.h b/ydb/core/tx/conveyor_composite/service/manager.h
new file mode 100644
index 00000000000..30c0cc7666f
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/manager.h
@@ -0,0 +1,78 @@
+#pragma once
+#include "category.h"
+#include "workers_pool.h"
+
+#include <ydb/core/tx/conveyor_composite/usage/config.h>
+
+namespace NKikimr::NConveyorComposite {
+class TTasksManager {
+private:
+ const TString ConveyorName;
+ std::vector<std::shared_ptr<TWorkersPool>> WorkerPools;
+ std::vector<std::shared_ptr<TProcessCategory>> Categories;
+ const NActors::TActorId DistributorActorId;
+ const NConfig::TConfig Config;
+ TCounters& Counters;
+
+public:
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb << "{";
+ sb << ConveyorName << ":";
+ for (auto&& wp : WorkerPools) {
+ sb << wp->GetMaxWorkerThreads() << ",";
+ }
+ sb << ";";
+ sb << "}";
+ return sb;
+ }
+
+ void DoQuant(const TMonotonic newStart) {
+ for (auto&& c : Categories) {
+ c->DoQuant(newStart);
+ }
+ }
+
+ TTasksManager(const TString& convName, const NConfig::TConfig& config, const NActors::TActorId distributorActorId, TCounters& counters)
+ : ConveyorName(convName)
+ , DistributorActorId(distributorActorId)
+ , Config(config)
+ , Counters(counters)
+ {
+ for (auto&& i : GetEnumAllValues<ESpecialTaskCategory>()) {
+ Categories.emplace_back(std::make_shared<TProcessCategory>(Config.GetCategoryConfig(i), Counters));
+ }
+ for (auto&& i : Config.GetWorkerPools()) {
+ WorkerPools.emplace_back(std::make_shared<TWorkersPool>(ConveyorName, distributorActorId, i, Counters, Categories));
+ }
+ }
+
+ TWorkersPool& MutableWorkersPool(const ui32 workersPoolId) {
+ AFL_VERIFY(workersPoolId < WorkerPools.size());
+ return *WorkerPools[workersPoolId];
+ }
+
+ [[nodiscard]] bool DrainTasks() {
+ bool result = false;
+ for (auto&& i : WorkerPools) {
+ if (i->DrainTasks()) {
+ result = true;
+ }
+ }
+ return result;
+ }
+
+ TProcessCategory& MutableCategoryVerified(const ESpecialTaskCategory category) {
+ AFL_VERIFY((ui64)category < Categories.size());
+ AFL_VERIFY(!!Categories[(ui64)category]);
+ return *Categories[(ui64)category];
+ }
+
+ const TProcessCategory& GetCategoryVerified(const ESpecialTaskCategory category) const {
+ AFL_VERIFY((ui64)category < Categories.size());
+ AFL_VERIFY(!!Categories[(ui64)category]);
+ return *Categories[(ui64)category];
+ }
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/process.cpp b/ydb/core/tx/conveyor_composite/service/process.cpp
new file mode 100644
index 00000000000..b4bf9a8611f
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/process.cpp
@@ -0,0 +1,5 @@
+#include "process.h"
+
+namespace NKikimr::NConveyorComposite {
+
+}
diff --git a/ydb/core/tx/conveyor_composite/service/process.h b/ydb/core/tx/conveyor_composite/service/process.h
new file mode 100644
index 00000000000..cbc58bb4de4
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/process.h
@@ -0,0 +1,122 @@
+#pragma once
+#include "common.h"
+#include "worker.h"
+
+#include <ydb/core/tx/conveyor_composite/usage/config.h>
+#include <ydb/core/tx/conveyor_composite/usage/events.h>
+
+#include <ydb/library/accessor/positive_integer.h>
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/log.h>
+#include <ydb/library/signals/owner.h>
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <queue>
+
+namespace NKikimr::NConveyorComposite {
+
+class TDequePriorityFIFO {
+private:
+ std::map<ui32, std::deque<TWorkerTask>> Tasks;
+ ui32 Size = 0;
+
+public:
+ void push(const TWorkerTask& task) {
+ Tasks[(ui32)task.GetTask()->GetPriority()].emplace_back(task);
+ ++Size;
+ }
+ TWorkerTask pop() {
+ Y_ABORT_UNLESS(Size);
+ auto result = std::move(Tasks.rbegin()->second.front());
+ Tasks.rbegin()->second.pop_front();
+ if (Tasks.rbegin()->second.size() == 0) {
+ Tasks.erase(--Tasks.end());
+ }
+ --Size;
+ return result;
+ }
+ ui32 size() const {
+ return Size;
+ }
+};
+
+class TProcessOrdered {
+private:
+ YDB_READONLY(ui64, ProcessId, 0);
+ YDB_READONLY(ui64, CPUTime, 0);
+
+public:
+ TProcessOrdered(const ui64 processId, const ui64 cpuTime)
+ : ProcessId(processId)
+ , CPUTime(cpuTime) {
+ }
+
+ bool operator<(const TProcessOrdered& item) const {
+ if (CPUTime < item.CPUTime) {
+ return true;
+ }
+ if (item.CPUTime < CPUTime) {
+ return false;
+ }
+ return ProcessId < item.ProcessId;
+ }
+};
+
+class TProcess: public TMoveOnly {
+private:
+ YDB_READONLY(ui64, ProcessId, 0);
+ YDB_READONLY_DEF(std::shared_ptr<TCPUUsage>, CPUUsage);
+ YDB_ACCESSOR_DEF(TDequePriorityFIFO, Tasks);
+ TAverageCalcer<TDuration> AverageTaskDuration;
+ ui32 LinksCount = 0;
+
+public:
+ void DoQuant(const TMonotonic newStart) {
+ CPUUsage->Cut(newStart);
+ }
+
+ bool HasTasks() const {
+ return Tasks.size();
+ }
+
+ TWorkerTask ExtractTaskWithPrediction() {
+ auto result = Tasks.pop();
+ CPUUsage->AddPredicted(result.GetPredictedDuration());
+ return result;
+ }
+
+ void PutTaskResult(TWorkerTaskResult&& result) {
+ CPUUsage->Exchange(result.GetPredictedDuration(), result.GetStart(), result.GetFinish());
+ AverageTaskDuration.Add(result.GetDuration());
+ }
+
+ [[nodiscard]] bool DecRegistration() {
+ AFL_VERIFY(LinksCount);
+ --LinksCount;
+ return LinksCount == 0;
+ }
+
+ double GetWeight() const {
+ return 1.0;
+ }
+
+ void IncRegistration() {
+ ++LinksCount;
+ }
+
+ TProcess(const ui64 processId, const std::shared_ptr<TCPUUsage>& scopeUsage)
+ : ProcessId(processId)
+ {
+ CPUUsage = std::make_shared<TCPUUsage>(scopeUsage);
+ IncRegistration();
+ }
+
+ void RegisterTask(const std::shared_ptr<ITask>& task, const TString& scopeId, const std::shared_ptr<TCategorySignals>& signals) {
+ TWorkerTask wTask(task, AverageTaskDuration.GetValue(), signals->GetCategory(), scopeId,
+ signals->GetTaskSignals(task->GetTaskClassIdentifier()), ProcessId);
+ Tasks.push(std::move(wTask));
+ }
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/scope.cpp b/ydb/core/tx/conveyor_composite/service/scope.cpp
new file mode 100644
index 00000000000..0613ee69e20
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/scope.cpp
@@ -0,0 +1,38 @@
+#include "scope.h"
+
+namespace NKikimr::NConveyorComposite {
+
+bool TProcessScope::HasTasks() const {
+ for (auto&& i : Processes) {
+ if (i.second.HasTasks()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+TWorkerTask TProcessScope::ExtractTaskWithPrediction() {
+ TProcess* pMin = nullptr;
+ TDuration dMin = TDuration::Max();
+ for (auto&& [_, p] : Processes) {
+ if (!p.HasTasks()) {
+ continue;
+ }
+ const TDuration d = p.GetCPUUsage()->CalcWeight(p.GetWeight());
+ if (!pMin || d < dMin) {
+ pMin = &p;
+ dMin = d;
+ }
+ }
+ AFL_VERIFY(pMin)("size", Processes.size());
+ return pMin->ExtractTaskWithPrediction();
+}
+
+void TProcessScope::DoQuant(const TMonotonic newStart) {
+ CPUUsage->Cut(newStart);
+ for (auto&& i : Processes) {
+ i.second.DoQuant(newStart);
+ }
+}
+
+}
diff --git a/ydb/core/tx/conveyor_composite/service/scope.h b/ydb/core/tx/conveyor_composite/service/scope.h
new file mode 100644
index 00000000000..b95a15dad20
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/scope.h
@@ -0,0 +1,71 @@
+#pragma once
+#include "common.h"
+#include "process.h"
+
+namespace NKikimr::NConveyorComposite {
+
+class TProcessScope: public TNonCopyable {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<TCPUUsage>, CPUUsage);
+ TCPUGroup::TPtr ScopeLimits;
+ THashMap<ui64, TProcess> Processes;
+
+public:
+ double GetWeight() const {
+ return ScopeLimits->GetWeight();
+ }
+
+ ui32 GetProcessesCount() const {
+ return Processes.size();
+ }
+ bool HasTasks() const;
+
+ TWorkerTask ExtractTaskWithPrediction();
+
+ void DoQuant(const TMonotonic newStart);
+
+ void UpdateLimits(const TCPULimitsConfig& processCpuLimits) {
+ ScopeLimits->SetCPUThreadsLimit(processCpuLimits.GetCPUGroupThreadsLimitDef(256));
+ ScopeLimits->SetWeight(processCpuLimits.GetWeight());
+ }
+
+ void PutTaskResult(TWorkerTaskResult&& result) {
+ const ui64 id = result.GetProcessId();
+ if (auto* process = MutableProcessOptional(id)) {
+ process->PutTaskResult(std::move(result));
+ }
+ }
+
+ TProcessScope(TCPUGroup::TPtr&& limits, const std::shared_ptr<TCPUUsage>& categoryScope)
+ : CPUUsage(std::make_shared<TCPUUsage>(categoryScope))
+ , ScopeLimits(std::move(limits)) {
+ }
+
+ TProcess& MutableProcessVerified(const ui64 processId) {
+ auto it = Processes.find(processId);
+ AFL_VERIFY(it != Processes.end());
+ return it->second;
+ }
+
+ TProcess* MutableProcessOptional(const ui64 processId) {
+ auto it = Processes.find(processId);
+ if (it != Processes.end()) {
+ return &it->second;
+ } else {
+ return nullptr;
+ }
+ }
+
+ void RegisterProcess(const ui64 processId) {
+ TProcess process(processId, CPUUsage);
+ AFL_VERIFY(Processes.emplace(processId, std::move(process)).second);
+ ScopeLimits->IncProcesses();
+ }
+
+ bool UnregisterProcess(const ui64 processId) {
+ AFL_VERIFY(Processes.erase(processId));
+ return ScopeLimits->DecProcesses();
+ }
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/service.cpp b/ydb/core/tx/conveyor_composite/service/service.cpp
new file mode 100644
index 00000000000..1195f7c1a08
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/service.cpp
@@ -0,0 +1,69 @@
+#include "manager.h"
+#include "service.h"
+
+#include <ydb/core/kqp/query_data/kqp_predictor.h>
+#include <ydb/core/tx/conveyor_composite/usage/service.h>
+
+namespace NKikimr::NConveyorComposite {
+
+TDistributor::TDistributor(
+ const NConfig::TConfig& config, const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals)
+ : Config(config)
+ , ConveyorName(conveyorName)
+ , Counters(ConveyorName, conveyorSignals) {
+}
+
+TDistributor::~TDistributor() {
+}
+
+void TDistributor::Bootstrap() {
+ Manager = std::make_unique<TTasksManager>(ConveyorName, Config, SelfId(), Counters);
+ AFL_NOTICE(NKikimrServices::TX_CONVEYOR)("name", ConveyorName)("action", "conveyor_registered")("config", Config.DebugString())(
+ "actor_id", SelfId())("manager", Manager->DebugString());
+ Become(&TDistributor::StateMain);
+ TBase::Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(1));
+}
+
+void TDistributor::HandleMain(NActors::TEvents::TEvWakeup::TPtr& evExt) {
+ if (evExt->Get()->Tag == 1) {
+ Manager->DoQuant(TMonotonic::Now());
+ TBase::Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(1));
+ }
+}
+
+void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& evExt) {
+ TWorkersPool& workersPool = Manager->MutableWorkersPool(evExt->Get()->GetWorkersPoolId());
+ workersPool.AddDeliveryDuration(evExt->Get()->GetForwardSendDuration() + (TMonotonic::Now() - evExt->Get()->GetConstructInstant()));
+ workersPool.ReleaseWorker(evExt->Get()->GetWorkerIdx());
+ for (auto&& i : evExt->Get()->DetachResults()) {
+ workersPool.PutTaskResult(std::move(i));
+ }
+ if (workersPool.HasTasks()) {
+ AFL_VERIFY(workersPool.DrainTasks());
+ }
+}
+
+void TDistributor::HandleMain(TEvExecution::TEvRegisterProcess::TPtr& ev) {
+ Manager->MutableCategoryVerified(ev->Get()->GetCategory())
+ .UpsertScope(ev->Get()->GetScopeId(), ev->Get()->GetCPULimits())
+ .RegisterProcess(ev->Get()->GetProcessId());
+}
+
+void TDistributor::HandleMain(TEvExecution::TEvUnregisterProcess::TPtr& ev) {
+ auto* evData = ev->Get();
+ if (Manager->MutableCategoryVerified(evData->GetCategory())
+ .MutableProcessScope(evData->GetScopeId())
+ .UnregisterProcess(evData->GetProcessId())) {
+ Manager->MutableCategoryVerified(evData->GetCategory()).UnregisterScope(evData->GetScopeId());
+ }
+}
+
+void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) {
+ auto& cat = Manager->MutableCategoryVerified(ev->Get()->GetCategory());
+ cat.MutableProcessScope(ev->Get()->GetScopeId())
+ .MutableProcessVerified(ev->Get()->GetProcessId())
+ .RegisterTask(ev->Get()->GetTask(), ev->Get()->GetScopeId(), cat.GetCounters());
+ Y_UNUSED(Manager->DrainTasks());
+}
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/service.h b/ydb/core/tx/conveyor_composite/service/service.h
new file mode 100644
index 00000000000..159fff0e416
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/service.h
@@ -0,0 +1,67 @@
+#pragma once
+#include "counters.h"
+#include "events.h"
+
+#include <ydb/core/tx/conveyor_composite/usage/config.h>
+#include <ydb/core/tx/conveyor_composite/usage/events.h>
+
+#include <ydb/library/accessor/positive_integer.h>
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/log.h>
+#include <ydb/library/signals/owner.h>
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <queue>
+
+namespace NKikimr::NConveyorComposite {
+class TTasksManager;
+class TDistributor: public TActorBootstrapped<TDistributor> {
+private:
+ using TBase = TActorBootstrapped<TDistributor>;
+ const NConfig::TConfig Config;
+ const TString ConveyorName = "common";
+ std::shared_ptr<TTasksManager> Manager;
+ TCounters Counters;
+ TMonotonic LastAddProcessInstant = TMonotonic::Now();
+
+ void HandleMain(TEvExecution::TEvNewTask::TPtr& ev);
+ void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev);
+ void HandleMain(TEvExecution::TEvRegisterProcess::TPtr& ev);
+ void HandleMain(TEvExecution::TEvUnregisterProcess::TPtr& ev);
+ void HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev);
+
+ void AddProcess(const ui64 processId, const TCPULimitsConfig& cpuLimits);
+
+ void AddCPUTime(const ui64 processId, const TDuration d);
+
+ TWorkerTask PopTask();
+
+ void PushTask(const TWorkerTask& task);
+
+ void ChangeAmountCPULimit(const double delta);
+
+public:
+ STATEFN(StateMain) {
+ // NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("name", ConveyorName)
+ // ("workers", Workers.size())("waiting", Waiting.size())("actor_id", SelfId());
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExecution::TEvNewTask, HandleMain);
+ hFunc(NActors::TEvents::TEvWakeup, HandleMain);
+ hFunc(TEvInternal::TEvTaskProcessedResult, HandleMain);
+ hFunc(TEvExecution::TEvRegisterProcess, HandleMain);
+ hFunc(TEvExecution::TEvUnregisterProcess, HandleMain);
+ default:
+ AFL_ERROR(NKikimrServices::TX_CONVEYOR)("problem", "unexpected event for task executor")("ev_type", ev->GetTypeName());
+ break;
+ }
+ }
+
+ TDistributor(const NConfig::TConfig& config, const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals);
+
+ ~TDistributor();
+
+ void Bootstrap();
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/worker.cpp b/ydb/core/tx/conveyor_composite/service/worker.cpp
new file mode 100644
index 00000000000..3ab805fe3ee
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/worker.cpp
@@ -0,0 +1,64 @@
+#include "worker.h"
+
+namespace NKikimr::NConveyorComposite {
+
+TDuration TWorker::GetWakeupDuration() const {
+ AFL_VERIFY(ExecutionDuration);
+ return (*ExecutionDuration) * (1 - CPUSoftLimit) / CPUSoftLimit;
+}
+
+void TWorker::ExecuteTask(std::vector<TWorkerTask>&& workerTasks) {
+ AFL_VERIFY(!ExecutionDuration && Results.empty());
+ std::vector<TWorkerTaskResult> results;
+ results.reserve(workerTasks.size());
+ const TMonotonic startGlobal = TMonotonic::Now();
+ for (auto&& t : workerTasks) {
+ const TMonotonic start = TMonotonic::Now();
+ t.GetTask()->Execute(t.GetTaskSignals(), t.GetTask());
+ results.emplace_back(t.GetResult(start, TMonotonic::Now()));
+ }
+ if (CPUSoftLimit < 1) {
+ AFL_DEBUG(NKikimrServices::TX_CONVEYOR)("action", "to_wait_result")("id", SelfId())("count", workerTasks.size());
+ ExecutionDuration = TMonotonic::Now() - startGlobal;
+ Results = std::move(results);
+ Schedule(GetWakeupDuration(), new NActors::TEvents::TEvWakeup(CPULimitGeneration));
+ WaitWakeUp = true;
+ } else {
+ AFL_VERIFY(!!ForwardDuration);
+ AFL_DEBUG(NKikimrServices::TX_CONVEYOR)("action", "to_result")("id", SelfId())("count", Results.size())("d", TMonotonic::Now() - startGlobal);
+ TBase::Sender<TEvInternal::TEvTaskProcessedResult>(std::move(results), *ForwardDuration, WorkerIdx, WorkersPoolId).SendTo(DistributorId);
+ ForwardDuration.reset();
+ }
+}
+
+void TWorker::HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev) {
+ const auto evGeneration = ev->Get()->Tag;
+ AFL_VERIFY(evGeneration <= CPULimitGeneration);
+ if (evGeneration == CPULimitGeneration) {
+ OnWakeup();
+ }
+}
+
+void TWorker::OnWakeup() {
+ AFL_VERIFY(ExecutionDuration);
+ AFL_VERIFY(Results.size());
+ AFL_VERIFY(!!ForwardDuration);
+ AFL_DEBUG(NKikimrServices::TX_CONVEYOR)("action", "wake_up")("id", SelfId())("count", Results.size());
+ TBase::Sender<TEvInternal::TEvTaskProcessedResult>(std::move(Results), *ForwardDuration, WorkerIdx, WorkersPoolId).SendTo(DistributorId);
+ ForwardDuration.reset();
+ Results.clear();
+ ExecutionDuration.reset();
+
+ WaitWakeUp = false;
+}
+
+void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) {
+ AFL_VERIFY(!WaitWakeUp);
+ const TMonotonic now = TMonotonic::Now();
+ ForwardDuration = now - ev->Get()->GetConstructInstant();
+ SendFwdHistogram->Collect(ForwardDuration->MicroSeconds());
+ SendFwdDuration->Add(ForwardDuration->MicroSeconds());
+ ExecuteTask(ev->Get()->ExtractTasks());
+}
+
+}
diff --git a/ydb/core/tx/conveyor_composite/service/worker.h b/ydb/core/tx/conveyor_composite/service/worker.h
new file mode 100644
index 00000000000..c8209c1e797
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/worker.h
@@ -0,0 +1,69 @@
+#pragma once
+
+#include "counters.h"
+#include "events.h"
+
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/library/actors/core/event_local.h>
+#include <ydb/library/actors/core/events.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <ydb/library/actors/core/log.h>
+#include <ydb/library/conclusion/result.h>
+#include <ydb/library/services/services.pb.h>
+
+namespace NKikimr::NConveyorComposite {
+
+class TWorker: public NActors::TActorBootstrapped<TWorker> {
+private:
+ using TBase = NActors::TActorBootstrapped<TWorker>;
+ const double CPUHardLimit = 1;
+ YDB_READONLY(double, CPUSoftLimit, 1);
+ ui64 CPULimitGeneration = 0;
+ bool WaitWakeUp = false;
+ std::optional<TDuration> ForwardDuration;
+ const NActors::TActorId DistributorId;
+ const ui64 WorkerIdx;
+ const ui64 WorkersPoolId;
+ std::optional<TDuration> ExecutionDuration;
+ std::vector<TWorkerTaskResult> Results;
+ const ::NMonitoring::THistogramPtr SendFwdHistogram;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr SendFwdDuration;
+ TDuration GetWakeupDuration() const;
+ void ExecuteTask(std::vector<TWorkerTask>&& workerTasks);
+ void HandleMain(TEvInternal::TEvNewTask::TPtr& ev);
+ void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev);
+ void OnWakeup();
+
+public:
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvInternal::TEvNewTask, HandleMain);
+ hFunc(NActors::TEvents::TEvWakeup, HandleMain);
+ default:
+ ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "unexpected event for task executor: " << ev->GetTypeRewrite();
+ break;
+ }
+ }
+
+ void Bootstrap() {
+ Become(&TWorker::StateMain);
+ }
+
+ TWorker(const TString& conveyorName, const double cpuHardLimit, const NActors::TActorId& distributorId, const ui64 workerIdx,
+ const ui64 workersPoolId, const ::NMonitoring::THistogramPtr sendFwdHistogram,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr sendFwdDuration)
+ : TBase("CONVEYOR::" + conveyorName + "::WORKER")
+ , CPUHardLimit(cpuHardLimit)
+ , CPUSoftLimit(cpuHardLimit)
+ , DistributorId(distributorId)
+ , WorkerIdx(workerIdx)
+ , WorkersPoolId(workersPoolId)
+ , SendFwdHistogram(sendFwdHistogram)
+ , SendFwdDuration(sendFwdDuration) {
+ AFL_VERIFY(0 < CPUHardLimit);
+ AFL_VERIFY(CPUHardLimit <= 1);
+ }
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/workers_pool.cpp b/ydb/core/tx/conveyor_composite/service/workers_pool.cpp
new file mode 100644
index 00000000000..8a722186953
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/workers_pool.cpp
@@ -0,0 +1,85 @@
+#include "workers_pool.h"
+
+namespace NKikimr::NConveyorComposite {
+TWorkersPool::TWorkersPool(const TString& conveyorName, const NActors::TActorId& distributorId, const NConfig::TWorkersPool& config,
+ const TCounters& counters, const std::vector<std::shared_ptr<TProcessCategory>>& categories)
+ : WorkersCount(config.GetWorkersCount())
+ , Counters(counters) {
+ Workers.reserve(WorkersCount);
+ for (auto&& i : config.GetLinks()) {
+ AFL_VERIFY((ui64)i.GetCategory() < categories.size());
+ Processes.emplace_back(TWeightedCategory(i.GetWeight(), categories[(ui64)i.GetCategory()]));
+ }
+ AFL_VERIFY(Processes.size());
+ for (ui32 i = 0; i < WorkersCount; ++i) {
+ Workers.emplace_back(std::make_unique<TWorker>(conveyorName, config.GetWorkerCPUUsage(i), distributorId, i,
+ config.GetWorkersPoolId(), Counters.SendFwdHistogram, Counters.SendFwdDuration));
+ ActiveWorkersIdx.emplace_back(i);
+ }
+ AFL_VERIFY(WorkersCount)("name", conveyorName)("action", "conveyor_registered")("config", config.DebugString())("actor_id", distributorId)(
+ "count", WorkersCount);
+ Counters.WaitingQueueSizeLimit->Set(config.GetWorkersCountDouble());
+ Counters.AmountCPULimit->Set(0);
+ Counters.AvailableWorkersCount->Set(0);
+ Counters.WorkersCountLimit->Set(WorkersCount);
+}
+
+bool TWorkersPool::HasFreeWorker() const {
+ return !ActiveWorkersIdx.empty();
+}
+
+void TWorkersPool::RunTask(std::vector<TWorkerTask>&& tasksBatch) {
+ AFL_VERIFY(HasFreeWorker());
+ const auto workerIdx = ActiveWorkersIdx.back();
+ ActiveWorkersIdx.pop_back();
+ Counters.AvailableWorkersCount->Set(ActiveWorkersIdx.size());
+
+ auto& worker = Workers[workerIdx];
+ worker.OnStartTask();
+ TActivationContext::Send(worker.GetWorkerId(), std::make_unique<TEvInternal::TEvNewTask>(std::move(tasksBatch)));
+}
+
+void TWorkersPool::ReleaseWorker(const ui32 workerIdx) {
+ AFL_VERIFY(workerIdx < Workers.size());
+ Workers[workerIdx].OnStopTask();
+ ActiveWorkersIdx.emplace_back(workerIdx);
+ Counters.AvailableWorkersCount->Set(ActiveWorkersIdx.size());
+}
+
+bool TWorkersPool::DrainTasks() {
+ if (ActiveWorkersIdx.empty()) {
+ return false;
+ }
+ const auto predHeap = [](const TWeightedCategory& l, const TWeightedCategory& r) {
+ const bool hasL = l.GetCategory()->HasTasks();
+ const bool hasR = r.GetCategory()->HasTasks();
+ if (!hasL && !hasR) {
+ return false;
+ } else if (!hasL && hasR) {
+ return true;
+ } else if (hasL && !hasR) {
+ return false;
+ }
+ return r.GetCPUUsage()->CalcWeight(r.GetWeight()) < l.GetCPUUsage()->CalcWeight(l.GetWeight());
+ };
+ std::make_heap(Processes.begin(), Processes.end(), predHeap);
+ AFL_VERIFY(Processes.size());
+ bool newTask = false;
+ while (ActiveWorkersIdx.size() && Processes.front().GetCategory()->HasTasks()) {
+ TDuration predicted = TDuration::Zero();
+ std::vector<TWorkerTask> tasks;
+ while ((tasks.empty() || predicted < DeliveringDuration.GetValue() * 10) && Processes.front().GetCategory()->HasTasks()) {
+ std::pop_heap(Processes.begin(), Processes.end(), predHeap);
+ tasks.emplace_back(Processes.back().GetCategory()->ExtractTaskWithPrediction());
+ Processes.back().GetCPUUsage()->AddPredicted(tasks.back().GetPredictedDuration());
+ predicted += tasks.back().GetPredictedDuration();
+ std::push_heap(Processes.begin(), Processes.end(), predHeap);
+ }
+ newTask = true;
+ AFL_VERIFY(tasks.size());
+ RunTask(std::move(tasks));
+ }
+ return newTask;
+}
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/workers_pool.h b/ydb/core/tx/conveyor_composite/service/workers_pool.h
new file mode 100644
index 00000000000..6fa9def6c9b
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/workers_pool.h
@@ -0,0 +1,100 @@
+#pragma once
+#include "category.h"
+#include "common.h"
+#include "worker.h"
+
+#include <ydb/library/actors/core/actorid.h>
+
+namespace NKikimr::NConveyorComposite {
+
+class TWeightedCategory {
+private:
+ YDB_READONLY(std::shared_ptr<TCPUUsage>, CPUUsage, std::make_shared<TCPUUsage>(nullptr));
+ YDB_READONLY_DEF(std::shared_ptr<TProcessCategory>, Category);
+ YDB_READONLY(double, Weight, 1);
+
+public:
+ TWeightedCategory(const double weight, const std::shared_ptr<TProcessCategory>& cat)
+ : Category(cat)
+ , Weight(weight)
+ {
+ AFL_VERIFY(cat);
+ AFL_VERIFY(Weight);
+ }
+};
+
+class TWorkersPool {
+private:
+ class TWorkerInfo {
+ YDB_READONLY(bool, RunningTask, false);
+ YDB_READONLY(TWorker*, Worker, nullptr);
+ YDB_READONLY_DEF(NActors::TActorId, WorkerId);
+
+ public:
+ explicit TWorkerInfo(std::unique_ptr<TWorker>&& worker)
+ : Worker(worker.get())
+ , WorkerId(TActivationContext::Register(worker.release())) {
+ }
+
+ void OnStartTask() {
+ AFL_VERIFY(!RunningTask);
+ RunningTask = true;
+ }
+
+ void OnStopTask() {
+ AFL_VERIFY(RunningTask);
+ RunningTask = false;
+ }
+ };
+
+ YDB_READONLY(ui32, WorkersCount, 0);
+ YDB_READONLY(double, MaxWorkerThreads, 0);
+ YDB_READONLY(double, AmountCPULimit, 0);
+ std::vector<TWeightedCategory> Processes;
+ std::vector<TWorkerInfo> Workers;
+ std::vector<ui32> ActiveWorkersIdx;
+ TCounters Counters;
+ TAverageCalcer<TDuration> DeliveringDuration;
+ std::deque<TDuration> DeliveryDurations;
+
+public:
+ static constexpr double Eps = 1e-6;
+ using TPtr = std::shared_ptr<TWorkersPool>;
+
+ TWorkersPool(const TString& conveyorName, const NActors::TActorId& distributorId, const NConfig::TWorkersPool& config,
+ const TCounters& counters, const std::vector<std::shared_ptr<TProcessCategory>>& categories);
+
+ bool HasTasks() const {
+ for (auto&& i : Processes) {
+ if (i.GetCategory()->HasTasks()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ [[nodiscard]] bool DrainTasks();
+
+ void AddDeliveryDuration(const TDuration d) {
+ DeliveringDuration.Add(d);
+ }
+
+ void PutTaskResult(TWorkerTaskResult&& result) {
+// const ui32 catIdx = (ui32)result.GetCategory();
+ for (auto&& i : Processes) {
+ if (i.GetCategory()->GetCategory() == result.GetCategory()) {
+// AFL_VERIFY(catIdx < Processes.size());
+// AFL_VERIFY(Processes[catIdx]);
+ i.GetCPUUsage()->Exchange(result.GetPredictedDuration(), result.GetStart(), result.GetFinish());
+ i.GetCategory()->PutTaskResult(std::move(result));
+ return;
+ }
+ }
+ AFL_VERIFY(false);
+ }
+ bool HasFreeWorker() const;
+ void RunTask(std::vector<TWorkerTask>&& tasksBatch);
+ void ReleaseWorker(const ui32 workerIdx);
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/service/ya.make b/ydb/core/tx/conveyor_composite/service/ya.make
new file mode 100644
index 00000000000..5ca71830673
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/service/ya.make
@@ -0,0 +1,21 @@
+LIBRARY()
+
+SRCS(
+ worker.cpp
+ service.cpp
+ process.cpp
+ common.cpp
+ manager.cpp
+ workers_pool.cpp
+ category.cpp
+ scope.cpp
+ counters.cpp
+ events.cpp
+)
+
+PEERDIR(
+ ydb/core/tx/conveyor_composite/usage
+ ydb/core/protos
+)
+
+END()
diff --git a/ydb/core/tx/conveyor_composite/usage/common.cpp b/ydb/core/tx/conveyor_composite/usage/common.cpp
new file mode 100644
index 00000000000..aeaaaa4af88
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/usage/common.cpp
@@ -0,0 +1,18 @@
+#include "common.h"
+#include "events.h"
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/log.h>
+
+namespace NKikimr::NConveyorComposite {
+
+void TProcessGuard::Finish() {
+ AFL_VERIFY(!Finished);
+ Finished = true;
+ if (ServiceActorId && NActors::TlsActivationContext) {
+ auto& context = NActors::TActorContext::AsActorContext();
+ context.Send(*ServiceActorId, new TEvExecution::TEvUnregisterProcess(Category, ScopeId, ProcessId));
+ }
+}
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/usage/common.h b/ydb/core/tx/conveyor_composite/usage/common.h
new file mode 100644
index 00000000000..612b3aa511e
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/usage/common.h
@@ -0,0 +1,44 @@
+#pragma once
+#include <ydb/core/tx/conveyor/usage/abstract.h>
+
+namespace NKikimr::NConveyorComposite {
+using ITask = NConveyor::ITask;
+
+enum class ESpecialTaskCategory {
+ Insert = 0 /* "insert" */,
+ Compaction = 1 /* "compaction" */,
+ Normalizer = 2 /* "normalizer" */,
+ Scan = 3 /* "scan" */
+};
+
+class TProcessGuard: TNonCopyable {
+private:
+ const ESpecialTaskCategory Category;
+ const TString ScopeId;
+ const ui64 ProcessId;
+ bool Finished = false;
+ const std::optional<NActors::TActorId> ServiceActorId;
+
+public:
+ ui64 GetProcessId() const {
+ return ProcessId;
+ }
+
+ explicit TProcessGuard(
+ const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId, const std::optional<NActors::TActorId>& actorId)
+ : Category(category)
+ , ScopeId(scopeId)
+ , ProcessId(processId)
+ , ServiceActorId(actorId) {
+ }
+
+ void Finish();
+
+ ~TProcessGuard() {
+ if (!Finished) {
+ Finish();
+ }
+ }
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/usage/config.cpp b/ydb/core/tx/conveyor_composite/usage/config.cpp
new file mode 100644
index 00000000000..e0fbda75b34
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/usage/config.cpp
@@ -0,0 +1,206 @@
+#include "config.h"
+
+#include <ydb/library/actors/core/log.h>
+
+#include <util/string/builder.h>
+#include <util/string/join.h>
+
+namespace NKikimr::NConveyorComposite::NConfig {
+
+TConclusionStatus TConfig::DeserializeFromProto(const NKikimrConfig::TCompositeConveyorConfig& config, const ui64 usableThreadsCount) {
+ if (!config.HasEnabled()) {
+ EnabledFlag = true;
+ } else {
+ EnabledFlag = config.GetEnabled();
+ }
+ for (auto&& i : GetEnumAllValues<ESpecialTaskCategory>()) {
+ Categories.emplace_back(TCategory(i));
+ }
+ TWorkersPool* defWorkersPool = nullptr;
+ WorkerPools.reserve(1 + config.GetWorkerPools().size());
+ if ((ui32)config.GetCategories().size() != GetEnumAllValues<ESpecialTaskCategory>().size()) {
+ TWorkersPool wp(WorkerPools.size(), usableThreadsCount);
+ WorkerPools.emplace_back(std::move(wp));
+ defWorkersPool = &WorkerPools.front();
+ }
+ std::set<ESpecialTaskCategory> usedCategories;
+ for (auto&& i : config.GetCategories()) {
+ TCategory cat(ESpecialTaskCategory::Insert);
+ auto conclusion = cat.DeserializeFromProto(i);
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
+ if (!usedCategories.emplace(cat.GetCategory()).second) {
+ return TConclusionStatus::Fail("category " + ::ToString(cat.GetCategory()) + " duplication");
+ }
+ Categories[(ui64)cat.GetCategory()] = std::move(cat);
+ }
+ for (auto&& i : Categories) {
+ if (i.IsDefault()) {
+ AFL_VERIFY(defWorkersPool);
+ AFL_VERIFY(defWorkersPool->AddLink(i.GetCategory()));
+ AFL_VERIFY(i.AddWorkerPool(defWorkersPool->GetWorkersPoolId()));
+ }
+ }
+ for (auto&& i : config.GetWorkerPools()) {
+ TWorkersPool wp(WorkerPools.size());
+ auto conclusion = wp.DeserializeFromProto(i, usableThreadsCount);
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
+ WorkerPools.emplace_back(std::move(wp));
+ for (auto&& link : WorkerPools.back().GetLinks()) {
+ AFL_VERIFY((ui64)link.GetCategory() < Categories.size());
+ auto& cat = Categories[(ui64)link.GetCategory()];
+ if (!cat.AddWorkerPool(WorkerPools.back().GetWorkersPoolId())) {
+ return TConclusionStatus::Fail("double link for category: " + ::ToString(link.GetCategory()));
+ }
+ }
+ }
+ for (auto&& c : Categories) {
+ if (c.GetWorkerPools().empty()) {
+ return TConclusionStatus::Fail("no worker pools for category: " + ::ToString(c.GetCategory()));
+ }
+ }
+ return TConclusionStatus::Success();
+}
+
+double TWorkersPool::GetWorkerCPUUsage(const ui32 workerIdx) const {
+ AFL_VERIFY(WorkersCountDouble);
+ double wholePart;
+ const double fractionalPart = std::modf(WorkersCountDouble, &wholePart);
+ if (workerIdx + 1 <= wholePart) {
+ return 1;
+ } else {
+ AFL_VERIFY(workerIdx == wholePart);
+ AFL_VERIFY(fractionalPart)("count", WorkersCountDouble);
+ return fractionalPart;
+ }
+}
+
+const TCategory& TConfig::GetCategoryConfig(const ESpecialTaskCategory cat) const {
+ AFL_VERIFY((ui64)cat < Categories.size());
+ return Categories[(ui64)cat];
+}
+
+TString TConfig::DebugString() const {
+ TStringBuilder sb;
+ sb << "{";
+ sb << "{Categories:[";
+ for (auto&& c : Categories) {
+ sb << c.DebugString() << ";";
+ }
+ sb << "]};";
+ sb << "{WorkerPools:[";
+ for (auto&& wp : WorkerPools) {
+ sb << wp.DebugString() << ";";
+ }
+ sb << "]};";
+ sb << "Enabled=" << EnabledFlag << ";";
+ sb << "}";
+ return sb;
+}
+
+TWorkersPool::TWorkersPool(const ui32 wpId, const double workersCountDouble)
+ : WorkersPoolId(wpId)
+ , WorkersCountDouble(workersCountDouble) {
+ AFL_VERIFY(WorkersCountDouble);
+}
+
+TConclusionStatus TWorkersPool::DeserializeFromProto(
+ const NKikimrConfig::TCompositeConveyorConfig::TWorkersPool& proto, const ui64 usableThreadsCount) {
+ if (!proto.GetLinks().size()) {
+ return TConclusionStatus::Fail("no categories for workers pool");
+ }
+ for (auto&& c : proto.GetLinks()) {
+ TWorkerPoolCategoryUsage link;
+ auto conclusion = link.DeserializeFromProto(c);
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
+ Links.emplace_back(std::move(link));
+ }
+ if (Links.empty()) {
+ return TConclusionStatus::Fail("no links for workers pool");
+ }
+ if (proto.HasWorkersCount()) {
+ WorkersCountDouble = proto.GetWorkersCount();
+ } else if (proto.HasDefaultFractionOfThreadsCount()) {
+ WorkersCountDouble = usableThreadsCount * proto.GetDefaultFractionOfThreadsCount();
+ } else {
+ WorkersCountDouble = usableThreadsCount;
+ }
+ if (WorkersCountDouble <= 0) {
+ return TConclusionStatus::Fail(
+ "incorrect WorkersCount calculated: " + proto.DebugString() + " for " + ::ToString(usableThreadsCount) + " threads");
+ }
+
+ return TConclusionStatus::Success();
+}
+
+TString TWorkersPool::DebugString() const {
+ TStringBuilder sb;
+ sb << "{";
+ sb << "id=" << WorkersPoolId << ";";
+ sb << "count=" << WorkersCountDouble << ";";
+ TStringBuilder sbLinks;
+ sbLinks << "[";
+ for (auto&& l : Links) {
+ sbLinks << l.DebugString() << ";";
+ }
+ sbLinks << "]";
+ sb << "links=" << sbLinks << ";";
+ sb << "}";
+ return sb;
+}
+
+ui32 TWorkersPool::GetWorkersCount() const {
+ AFL_VERIFY(WorkersCountDouble);
+ return ceil(WorkersCountDouble);
+}
+
+TString TCategory::DebugString() const {
+ TStringBuilder sb;
+ sb << "{";
+ sb << "category=" << Category << ";";
+ sb << "queue_limit=" << QueueSizeLimit << ";";
+ sb << "pools=" << JoinSeq(",", WorkerPools) << ";";
+ sb << "}";
+ return sb;
+}
+
+TString TWorkerPoolCategoryUsage::DebugString() const {
+ TStringBuilder sb;
+ sb << "{";
+ sb << "c=" << Category << ";";
+ sb << "w=" << Weight << ";";
+ sb << "}";
+ return sb;
+}
+
+} // namespace NKikimr::NConveyorComposite::NConfig
+
+namespace NKikimr::NConveyorComposite {
+TCPULimitsConfig::TCPULimitsConfig(const double cpuGroupThreadsLimit, const double weight)
+ : CPUGroupThreadsLimit(cpuGroupThreadsLimit)
+ , Weight(weight) {
+}
+
+TConclusionStatus TCPULimitsConfig::DeserializeFromProto(const NKikimrTxDataShard::TEvKqpScan& config) {
+ if (config.HasCpuGroupThreadsLimit()) {
+ CPUGroupThreadsLimit = config.GetCpuGroupThreadsLimit();
+ }
+ return TConclusionStatus::Success();
+}
+
+TString TCPULimitsConfig::DebugString() const {
+ TStringBuilder sb;
+ if (CPUGroupThreadsLimit) {
+ sb << "CPUGroupThreadsLimit=" << *CPUGroupThreadsLimit << ";";
+ } else {
+ sb << "Disabled;";
+ }
+ return sb;
+}
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/usage/config.h b/ydb/core/tx/conveyor_composite/usage/config.h
new file mode 100644
index 00000000000..2f074b24bb2
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/usage/config.h
@@ -0,0 +1,138 @@
+#pragma once
+#include "common.h"
+
+#include <ydb/core/protos/config.pb.h>
+#include <ydb/core/protos/tx_datashard.pb.h>
+#include <ydb/core/tx/conveyor/usage/config.h>
+
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/conclusion/status.h>
+
+namespace NKikimr::NConveyorComposite::NConfig {
+
+class TWorkerPoolCategoryUsage {
+private:
+ YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert);
+ YDB_READONLY(double, Weight, 1);
+
+public:
+ TWorkerPoolCategoryUsage() = default;
+
+ TWorkerPoolCategoryUsage(const ESpecialTaskCategory cat)
+ : Category(cat) {
+ }
+
+ TString DebugString() const;
+
+ [[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrConfig::TCompositeConveyorConfig::TWorkerPoolCategoryLink& proto) {
+ if (!TryFromString<ESpecialTaskCategory>(proto.GetCategory(), Category)) {
+ return TConclusionStatus::Fail("cannot parse category link: " + proto.GetCategory());
+ }
+ if (proto.HasWeight()) {
+ if (proto.GetWeight() <= 0) {
+ return TConclusionStatus::Fail("incorrect category link weight: " + ::ToString(proto.GetWeight()));
+ }
+ Weight = proto.GetWeight();
+ }
+ return TConclusionStatus::Success();
+ }
+};
+
+class TWorkersPool {
+private:
+ YDB_READONLY(ui32, WorkersPoolId, 0);
+ YDB_READONLY(double, WorkersCountDouble, 0);
+ YDB_READONLY_DEF(std::vector<TWorkerPoolCategoryUsage>, Links);
+
+public:
+ double GetWorkerCPUUsage(const ui32 workerIdx) const;
+ ui32 GetWorkersCount() const;
+
+ bool AddLink(const ESpecialTaskCategory cat) {
+ for (auto&& i : Links) {
+ if (i.GetCategory() == cat) {
+ return false;
+ }
+ }
+ Links.emplace_back(TWorkerPoolCategoryUsage(cat));
+ return true;
+ }
+
+ TString DebugString() const;
+
+ TWorkersPool(const ui32 wpId)
+ : WorkersPoolId(wpId) {
+ }
+
+ TWorkersPool(const ui32 wpId, const double workersCountDouble);
+
+ [[nodiscard]] TConclusionStatus DeserializeFromProto(
+ const NKikimrConfig::TCompositeConveyorConfig::TWorkersPool& proto, const ui64 usableThreadsCount);
+};
+
+class TCategory {
+private:
+ YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert);
+ YDB_READONLY(ui32, QueueSizeLimit, 256 * 1024);
+ YDB_READONLY_DEF(std::vector<ui32>, WorkerPools);
+ YDB_READONLY_FLAG(Default, true);
+
+public:
+ TString DebugString() const;
+
+ [[nodiscard]] bool AddWorkerPool(const ui32 id) {
+ for (auto&& i : WorkerPools) {
+ if (i == id) {
+ return false;
+ }
+ }
+ WorkerPools.emplace_back(id);
+ return true;
+ }
+
+ [[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrConfig::TCompositeConveyorConfig::TCategory& proto) {
+ if (!TryFromString<ESpecialTaskCategory>(proto.GetName(), Category)) {
+ return TConclusionStatus::Fail("cannot parse category: " + proto.GetName());
+ }
+ if (proto.HasQueueSizeLimit()) {
+ QueueSizeLimit = proto.GetQueueSizeLimit();
+ }
+ DefaultFlag = false;
+ return TConclusionStatus::Success();
+ }
+
+ TCategory(const ESpecialTaskCategory cat)
+ : Category(cat) {
+ }
+};
+
+class TConfig {
+private:
+ YDB_READONLY_DEF(std::vector<TCategory>, Categories);
+ YDB_READONLY_DEF(std::vector<TWorkersPool>, WorkerPools);
+ YDB_READONLY_FLAG(Enabled, true);
+
+public:
+ const TCategory& GetCategoryConfig(const ESpecialTaskCategory cat) const;
+
+ [[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrConfig::TCompositeConveyorConfig& config, const ui64 usableThreadsCount);
+
+ TString DebugString() const;
+};
+
+} // namespace NKikimr::NConveyorComposite::NConfig
+
+namespace NKikimr::NConveyorComposite {
+class TCPULimitsConfig {
+ YDB_OPT(double, CPUGroupThreadsLimit);
+ YDB_READONLY(double, Weight, 1);
+
+public:
+ TCPULimitsConfig() = default;
+ TCPULimitsConfig(const double cpuGroupThreadsLimit, const double weight = 1);
+
+ TConclusionStatus DeserializeFromProto(const NKikimrTxDataShard::TEvKqpScan& config);
+ TString DebugString() const;
+};
+
+}
diff --git a/ydb/core/tx/conveyor_composite/usage/events.cpp b/ydb/core/tx/conveyor_composite/usage/events.cpp
new file mode 100644
index 00000000000..5c94bd68be2
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/usage/events.cpp
@@ -0,0 +1,15 @@
+#include "events.h"
+
+#include <ydb/library/actors/core/log.h>
+
+namespace NKikimr::NConveyorComposite {
+
+TEvExecution::TEvNewTask::TEvNewTask(ITask::TPtr task, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId)
+ : Task(task)
+ , Category(category)
+ , ScopeId(scopeId)
+ , ProcessId(processId) {
+ AFL_VERIFY(Task);
+}
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/usage/events.h b/ydb/core/tx/conveyor_composite/usage/events.h
new file mode 100644
index 00000000000..a3169dad9bd
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/usage/events.h
@@ -0,0 +1,69 @@
+#pragma once
+#include "common.h"
+#include "config.h"
+
+#include <ydb/core/base/events.h>
+
+#include <ydb/library/actors/core/event_local.h>
+#include <ydb/library/actors/core/events.h>
+#include <ydb/library/conclusion/result.h>
+
+namespace NKikimr::NConveyorComposite {
+
+struct TEvExecution {
+ enum EEv {
+ EvNewTask = EventSpaceBegin(TKikimrEvents::ES_CONVEYOR_COMPOSITE),
+ EvRegisterProcess,
+ EvUnregisterProcess,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CONVEYOR_COMPOSITE), "expected EvEnd < EventSpaceEnd");
+
+ class TEvNewTask: public NActors::TEventLocal<TEvNewTask, EvNewTask> {
+ private:
+ YDB_READONLY_DEF(ITask::TPtr, Task);
+ YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert);
+ YDB_READONLY_DEF(TString, ScopeId);
+ YDB_READONLY(ui64, ProcessId, 0);
+ YDB_READONLY(TMonotonic, ConstructInstant, TMonotonic::Now());
+
+ public:
+ TEvNewTask() = default;
+
+ explicit TEvNewTask(ITask::TPtr task, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId);
+ };
+
+ class TEvRegisterProcess: public NActors::TEventLocal<TEvRegisterProcess, EvRegisterProcess> {
+ private:
+ YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert);
+ YDB_READONLY_DEF(TString, ScopeId);
+ YDB_READONLY(ui64, ProcessId, 0);
+ YDB_READONLY_DEF(TCPULimitsConfig, CPULimits);
+
+ public:
+ explicit TEvRegisterProcess(
+ const TCPULimitsConfig& cpuLimits, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId)
+ : Category(category)
+ , ScopeId(scopeId)
+ , ProcessId(processId)
+ , CPULimits(cpuLimits) {
+ }
+ };
+
+ class TEvUnregisterProcess: public NActors::TEventLocal<TEvUnregisterProcess, EvUnregisterProcess> {
+ private:
+ YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert);
+ YDB_READONLY_DEF(TString, ScopeId);
+ YDB_READONLY(ui64, ProcessId, 0);
+
+ public:
+ explicit TEvUnregisterProcess(const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId)
+ : Category(category)
+ , ScopeId(scopeId)
+ , ProcessId(processId) {
+ }
+ };
+};
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/usage/service.cpp b/ydb/core/tx/conveyor_composite/usage/service.cpp
new file mode 100644
index 00000000000..eb3953901e4
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/usage/service.cpp
@@ -0,0 +1,5 @@
+#include "service.h"
+
+namespace NKikimr::NConveyorComposite {
+
+}
diff --git a/ydb/core/tx/conveyor_composite/usage/service.h b/ydb/core/tx/conveyor_composite/usage/service.h
new file mode 100644
index 00000000000..336b5b5b9da
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/usage/service.h
@@ -0,0 +1,83 @@
+#pragma once
+#include "common.h"
+#include "config.h"
+
+#include <ydb/core/tx/conveyor_composite/service/service.h>
+#include <ydb/core/tx/conveyor_composite/usage/events.h>
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/actorid.h>
+
+namespace NKikimr::NConveyorComposite {
+template <class TConveyorPolicy>
+class TServiceOperatorImpl {
+private:
+ using TSelf = TServiceOperatorImpl<TConveyorPolicy>;
+ std::atomic<bool> IsEnabledFlag = false;
+ static void Register(const NConfig::TConfig& serviceConfig) {
+ Singleton<TSelf>()->IsEnabledFlag = serviceConfig.IsEnabled();
+ }
+ static const TString& GetConveyorName() {
+ Y_ABORT_UNLESS(TConveyorPolicy::Name.size() == 4);
+ return TConveyorPolicy::Name;
+ }
+
+public:
+ static bool SendTaskToExecute(const std::shared_ptr<ITask>& task, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId) {
+ if (TSelf::IsEnabled() && NActors::TlsActivationContext) {
+ auto& context = NActors::TActorContext::AsActorContext();
+ const NActors::TActorId& selfId = context.SelfID;
+ context.Send(MakeServiceId(selfId.NodeId()), new NConveyorComposite::TEvExecution::TEvNewTask(task, category, scopeId, processId));
+ return true;
+ } else {
+ task->Execute(nullptr, task);
+ return false;
+ }
+ }
+ static bool IsEnabled() {
+ return Singleton<TSelf>()->IsEnabledFlag;
+ }
+ static NActors::TActorId MakeServiceId(const ui32 nodeId) {
+ return NActors::TActorId(nodeId, "SrvcConv" + GetConveyorName());
+ }
+ static NActors::IActor* CreateService(const NConfig::TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) {
+ Register(config);
+ return new TDistributor(config, GetConveyorName(), conveyorSignals);
+ }
+ static TProcessGuard StartProcess(
+ const ESpecialTaskCategory category, const TString& scopeId, const ui64 externalProcessId, const TCPULimitsConfig& cpuLimits) {
+ if (TSelf::IsEnabled() && NActors::TlsActivationContext) {
+ auto& context = NActors::TActorContext::AsActorContext();
+ const NActors::TActorId& selfId = context.SelfID;
+ context.Send(MakeServiceId(selfId.NodeId()),
+ new NConveyorComposite::TEvExecution::TEvRegisterProcess(cpuLimits, category, scopeId, externalProcessId));
+ return TProcessGuard(category, scopeId, externalProcessId, MakeServiceId(selfId.NodeId()));
+ } else {
+ return TProcessGuard(category, scopeId, externalProcessId, {});
+ }
+ }
+};
+
+class TScanConveyorPolicy {
+public:
+ static const inline TString Name = "Scan";
+ static constexpr bool EnableProcesses = true;
+};
+
+class TCompConveyorPolicy {
+public:
+ static const inline TString Name = "Comp";
+ static constexpr bool EnableProcesses = false;
+};
+
+class TInsertConveyorPolicy {
+public:
+ static const inline TString Name = "Isrt";
+ static constexpr bool EnableProcesses = false;
+};
+
+using TScanServiceOperator = TServiceOperatorImpl<TScanConveyorPolicy>;
+using TCompServiceOperator = TServiceOperatorImpl<TCompConveyorPolicy>;
+using TInsertServiceOperator = TServiceOperatorImpl<TInsertConveyorPolicy>;
+
+} // namespace NKikimr::NConveyorComposite
diff --git a/ydb/core/tx/conveyor_composite/usage/ya.make b/ydb/core/tx/conveyor_composite/usage/ya.make
new file mode 100644
index 00000000000..5e307111624
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/usage/ya.make
@@ -0,0 +1,17 @@
+LIBRARY()
+
+SRCS(
+ events.cpp
+ config.cpp
+ service.cpp
+ common.cpp
+)
+
+PEERDIR(
+ ydb/library/actors/core
+ ydb/services/metadata/request
+)
+
+GENERATE_ENUM_SERIALIZATION(common.h)
+
+END()
diff --git a/ydb/core/tx/conveyor_composite/ut/ut_simple.cpp b/ydb/core/tx/conveyor_composite/ut/ut_simple.cpp
new file mode 100644
index 00000000000..046394f3c71
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/ut/ut_simple.cpp
@@ -0,0 +1,425 @@
+#include <ydb/core/tx/conveyor/usage/abstract.h>
+#include <ydb/core/tx/conveyor_composite/usage/config.h>
+#include <ydb/core/tx/conveyor_composite/usage/events.h>
+#include <ydb/core/tx/conveyor_composite/usage/service.h>
+
+#include <ydb/library/actors/core/executor_pool_basic.h>
+#include <ydb/library/actors/core/scheduler_basic.h>
+
+#include <contrib/libs/protobuf/src/google/protobuf/text_format.h>
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/generic/xrange.h>
+
+using namespace NKikimr::NConveyorComposite;
+
+namespace NKikimr {
+
+THolder<TActorSystemSetup> BuildActorSystemSetup(const ui32 threads, const ui32 pools) {
+ Y_ABORT_UNLESS(threads > 0 && threads < 100);
+ Y_ABORT_UNLESS(pools > 0 && pools < 10);
+
+ auto setup = MakeHolder<NActors::TActorSystemSetup>();
+
+ setup->NodeId = 1;
+
+ setup->ExecutorsCount = pools;
+ setup->Executors.Reset(new TAutoPtr<NActors::IExecutorPool>[pools]);
+ for (ui32 idx : xrange(pools)) {
+ setup->Executors[idx] = new NActors::TBasicExecutorPool(idx, threads, 50);
+ }
+
+ setup->Scheduler = new NActors::TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0));
+
+ return setup;
+}
+
+} // namespace NKikimr
+
+class TSleepTask: public NKikimr::NConveyor::ITask {
+private:
+ const TDuration ExecutionTime;
+ TAtomicCounter* Counter;
+ virtual void DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) override {
+ const TMonotonic start = TMonotonic::Now();
+ while (TMonotonic::Now() - start < ExecutionTime) {
+ }
+ Counter->Inc();
+ }
+
+public:
+ virtual TString GetTaskClassIdentifier() const override {
+ return "SLEEP";
+ }
+
+ TSleepTask(const TDuration d, TAtomicCounter& c)
+ : ExecutionTime(d)
+ , Counter(&c) {
+ }
+};
+
+class IRequestProcessor {
+private:
+ YDB_READONLY_DEF(TString, Id);
+ virtual void DoInitialize(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) = 0;
+ virtual void DoAddTask(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) = 0;
+ virtual bool DoCheckFinished() = 0;
+ virtual void DoFinish(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId, const TDuration d) = 0;
+ virtual TString DoDebugString() const = 0;
+
+public:
+ void Initialize(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) {
+ DoInitialize(actorSystem, distributorId);
+ }
+ void AddTask(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) {
+ DoAddTask(actorSystem, distributorId);
+ }
+ bool CheckFinished() {
+ return DoCheckFinished();
+ }
+ void Finish(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId, const TDuration d) {
+ DoFinish(actorSystem, distributorId, d);
+ }
+ TString DebugString() const {
+ return TStringBuilder() << "{" << Id << ":" << DoDebugString() << "}";
+ }
+ IRequestProcessor(const TString& id)
+ : Id(id) {
+ }
+ virtual ~IRequestProcessor() = default;
+};
+
+class TSimpleRequest: public IRequestProcessor {
+private:
+ YDB_ACCESSOR(double, ScopeWeight, 1);
+ const ESpecialTaskCategory Category;
+ const TString ScopeId;
+ const ui64 ProcessId;
+ TAtomicCounter Counter;
+ TAtomicCounter CounterTasks;
+ virtual TString DoDebugString() const override {
+ return TStringBuilder() << Counter.Val();
+ }
+
+ virtual void DoInitialize(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) override {
+ actorSystem.Send(distributorId, new TEvExecution::TEvRegisterProcess(TCPULimitsConfig(1000, ScopeWeight), Category, ScopeId, ProcessId));
+ }
+ virtual void DoAddTask(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) override {
+ actorSystem.Send(distributorId,
+ new TEvExecution::TEvNewTask(std::make_shared<TSleepTask>(TDuration::MicroSeconds(40), Counter), Category, ScopeId, ProcessId));
+ CounterTasks.Inc();
+ }
+ virtual bool DoCheckFinished() override {
+ return CounterTasks.Val() == Counter.Val();
+ }
+ virtual void DoFinish(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId, const TDuration /*d*/) override {
+ actorSystem.Send(distributorId, new TEvExecution::TEvUnregisterProcess(Category, ScopeId, ProcessId));
+ }
+
+public:
+ TSimpleRequest(const TString& id, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId)
+ : IRequestProcessor(id)
+ , Category(category)
+ , ScopeId(scopeId)
+ , ProcessId(processId) {
+ }
+};
+
+class TTestingExecutor {
+private:
+ virtual TString GetConveyorConfig() = 0;
+ virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() = 0;
+ virtual ui32 GetTasksCount() const {
+ return 1000000;
+ }
+public:
+ virtual double GetThreadsCount() const {
+ return 9.5;
+ }
+
+ void Execute() {
+ const ui64 threadsCount = 64;
+ THolder<NActors::TActorSystemSetup> actorSystemSetup = NKikimr::BuildActorSystemSetup(threadsCount, 1);
+ NActors::TActorSystem actorSystem(actorSystemSetup);
+
+ actorSystem.Start();
+ auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
+ const std::string textProto = GetConveyorConfig();
+ NKikimrConfig::TCompositeConveyorConfig protoConfig;
+ AFL_VERIFY(google::protobuf::TextFormat::ParseFromString(textProto, &protoConfig));
+
+ NConfig::TConfig config;
+ config.DeserializeFromProto(protoConfig, threadsCount).Validate();
+ const auto actorId = actorSystem.Register(TCompServiceOperator::CreateService(config, counters));
+
+ std::vector<std::shared_ptr<IRequestProcessor>> requests = GetRequests();
+ for (auto&& i : requests) {
+ i->Initialize(actorSystem, actorId);
+ }
+ for (ui32 i = 0; i < GetTasksCount(); ++i) {
+ for (auto&& i : requests) {
+ i->AddTask(actorSystem, actorId);
+ }
+ }
+ const TMonotonic globalStart = TMonotonic::Now();
+ std::vector<TDuration> durations;
+ durations.resize(requests.size());
+ {
+ bool isFinished = false;
+ while (!isFinished) {
+ isFinished = true;
+ ui32 idx = 0;
+ TStringBuilder sb;
+ for (auto&& i : requests) {
+ if (!i->CheckFinished()) {
+ isFinished = false;
+ } else if (!durations[idx]) {
+ durations[idx] = TMonotonic::Now() - globalStart;
+ }
+ sb << i->DebugString() << ";";
+ ++idx;
+ }
+ Cerr << sb << Endl;
+ if (!isFinished) {
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+ }
+ {
+ ui32 idx = 0;
+ for (auto&& i : requests) {
+ i->Finish(actorSystem, actorId, durations[idx]);
+ ++idx;
+ }
+ }
+ Cerr << (GetThreadsCount() * (TMonotonic::Now() - globalStart) / (1.0 * requests.size() * GetTasksCount())).MicroSeconds() << "us per task"
+ << Endl;
+ TStringBuilder sb;
+ for (auto&& i : durations) {
+ sb << i << ";";
+ }
+ Cerr << sb << Endl;
+ Sleep(TDuration::Seconds(5));
+
+ actorSystem.Stop();
+ actorSystem.Cleanup();
+ };
+};
+
+Y_UNIT_TEST_SUITE(CompositeConveyorTests) {
+ class TTestingExecutor10xDistribution: public TTestingExecutor {
+ private:
+ virtual TString GetConveyorConfig() override {
+ return Sprintf(R"(
+ WorkerPools {
+ WorkersCount: %f
+ Links {
+ Category: "insert"
+ Weight: 0.1
+ }
+ Links {
+ Category: "scan"
+ Weight: 0.01
+ }
+ Links {
+ Category: "normalizer"
+ Weight: 0.001
+ }
+ }
+ Categories {
+ Name: "insert"
+ }
+ Categories {
+ Name: "normalizer"
+ }
+ Categories {
+ Name: "scan"
+ }
+ )",
+ GetThreadsCount());
+ }
+ virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() override {
+ return { std::make_shared<TSimpleRequest>("I", ESpecialTaskCategory::Insert, "1", 1),
+ std::make_shared<TSimpleRequest>("S", ESpecialTaskCategory::Scan, "1", 1),
+ std::make_shared<TSimpleRequest>("N", ESpecialTaskCategory::Normalizer, "1", 1) };
+ }
+
+ public:
+ };
+ Y_UNIT_TEST(Test10xDistribution) {
+ TTestingExecutor10xDistribution().Execute();
+ }
+
+ class TTestingExecutor10xMultiDistribution: public TTestingExecutor {
+ private:
+ virtual TString GetConveyorConfig() override {
+ return Sprintf(R"(
+ WorkerPools {
+ WorkersCount: %f
+ Links {
+ Category: "scan"
+ Weight: 1
+ }
+ }
+ WorkerPools {
+ WorkersCount: %f
+ Links {
+ Category: "insert"
+ Weight: 0.1
+ }
+ Links {
+ Category: "scan"
+ Weight: 0.01
+ }
+ Links {
+ Category: "normalizer"
+ Weight: 0.001
+ }
+ }
+ Categories {
+ Name: "insert"
+ }
+ Categories {
+ Name: "normalizer"
+ }
+ Categories {
+ Name: "scan"
+ }
+ )",
+ GetThreadsCount(), GetThreadsCount());
+ }
+ virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() override {
+ return { std::make_shared<TSimpleRequest>("I", ESpecialTaskCategory::Insert, "1", 1),
+ std::make_shared<TSimpleRequest>("S", ESpecialTaskCategory::Scan, "1", 1),
+ std::make_shared<TSimpleRequest>("N", ESpecialTaskCategory::Normalizer, "1", 1) };
+ }
+
+ public:
+ };
+ Y_UNIT_TEST(Test10xMultiDistribution) {
+ TTestingExecutor10xMultiDistribution().Execute();
+ }
+
+ class TTestingExecutorUniformProcessDistribution: public TTestingExecutor {
+ private:
+ virtual TString GetConveyorConfig() override {
+ return Sprintf(R"(
+ WorkerPools {
+ WorkersCount: %f
+ Links {
+ Category: "insert"
+ Weight: 1
+ }
+ }
+ Categories {
+ Name: "insert"
+ }
+ )",
+ GetThreadsCount());
+ }
+ virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() override {
+ return { std::make_shared<TSimpleRequest>("1", ESpecialTaskCategory::Insert, "1", 1),
+ std::make_shared<TSimpleRequest>("2", ESpecialTaskCategory::Insert, "1", 2),
+ std::make_shared<TSimpleRequest>("3", ESpecialTaskCategory::Insert, "1", 3) };
+ }
+
+ public:
+ };
+ Y_UNIT_TEST(TestUniformProcessDistribution) {
+ TTestingExecutorUniformProcessDistribution().Execute();
+ }
+
+ class TTestingExecutorUniformScopesDistribution: public TTestingExecutor {
+ private:
+ virtual TString GetConveyorConfig() override {
+ return Sprintf(R"(
+ WorkerPools {
+ WorkersCount: %f
+ Links {
+ Category: "insert"
+ Weight: 1
+ }
+ }
+ Categories {
+ Name: "insert"
+ }
+ )",
+ GetThreadsCount());
+ }
+ virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() override {
+ return { std::make_shared<TSimpleRequest>("1", ESpecialTaskCategory::Insert, "1", 1),
+ std::make_shared<TSimpleRequest>("2", ESpecialTaskCategory::Insert, "2", 1),
+ std::make_shared<TSimpleRequest>("3", ESpecialTaskCategory::Insert, "3", 1) };
+ }
+
+ public:
+ };
+ Y_UNIT_TEST(TestUniformScopesDistribution) {
+ TTestingExecutorUniformScopesDistribution().Execute();
+ }
+
+ class TTestingExecutorUniformDistribution: public TTestingExecutor {
+ private:
+ virtual ui32 GetTasksCount() const override {
+ return 1000000;
+ }
+ virtual double GetThreadsCount() const override {
+ return 16.4;
+ }
+ virtual TString GetConveyorConfig() override {
+ return Sprintf(R"(
+ WorkerPools {
+ WorkersCount: %f
+ Links {
+ Category: "insert"
+ Weight: 0.1
+ }
+ Links {
+ Category: "scan"
+ Weight: 0.1
+ }
+ Links {
+ Category: "normalizer"
+ Weight: 0.1
+ }
+ }
+ Categories {
+ Name: "insert"
+ }
+ Categories {
+ Name: "normalizer"
+ }
+ Categories {
+ Name: "scan"
+ }
+ )",
+ GetThreadsCount());
+ }
+ virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() override {
+ return {
+ std::make_shared<TSimpleRequest>("I_1_1", ESpecialTaskCategory::Insert, "1", 1),
+ std::make_shared<TSimpleRequest>("I_2_1", ESpecialTaskCategory::Insert, "2", 1),
+ std::make_shared<TSimpleRequest>("I_3_1", ESpecialTaskCategory::Insert, "3", 1),
+ std::make_shared<TSimpleRequest>("S_1_1", ESpecialTaskCategory::Scan, "1", 1),
+ std::make_shared<TSimpleRequest>("S_2_1", ESpecialTaskCategory::Scan, "2", 1),
+ std::make_shared<TSimpleRequest>("S_3_1", ESpecialTaskCategory::Scan, "3", 1),
+ std::make_shared<TSimpleRequest>("N_1_1", ESpecialTaskCategory::Normalizer, "1", 1),
+ std::make_shared<TSimpleRequest>("N_2_1", ESpecialTaskCategory::Normalizer, "2", 1),
+ std::make_shared<TSimpleRequest>("N_3_1", ESpecialTaskCategory::Normalizer, "3", 1),
+ std::make_shared<TSimpleRequest>("I_1_2", ESpecialTaskCategory::Insert, "1", 2),
+ std::make_shared<TSimpleRequest>("I_2_2", ESpecialTaskCategory::Insert, "2", 2),
+ std::make_shared<TSimpleRequest>("I_3_2", ESpecialTaskCategory::Insert, "3", 2),
+ std::make_shared<TSimpleRequest>("S_1_2", ESpecialTaskCategory::Scan, "1", 2),
+ std::make_shared<TSimpleRequest>("S_2_2", ESpecialTaskCategory::Scan, "2", 2),
+ std::make_shared<TSimpleRequest>("S_3_2", ESpecialTaskCategory::Scan, "3", 2),
+ std::make_shared<TSimpleRequest>("N_1_2", ESpecialTaskCategory::Normalizer, "1", 2),
+ std::make_shared<TSimpleRequest>("N_2_2", ESpecialTaskCategory::Normalizer, "2", 2),
+ std::make_shared<TSimpleRequest>("N_3_2", ESpecialTaskCategory::Normalizer, "3", 2)
+ };
+ }
+
+ public:
+ };
+ Y_UNIT_TEST(TestUniformDistribution) {
+ TTestingExecutorUniformDistribution().Execute();
+ }
+}
diff --git a/ydb/core/tx/conveyor_composite/ut/ya.make b/ydb/core/tx/conveyor_composite/ut/ya.make
new file mode 100644
index 00000000000..9daa068c504
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/ut/ya.make
@@ -0,0 +1,30 @@
+UNITTEST_FOR(ydb/core/tx/conveyor_composite/service)
+
+FORK_SUBTESTS()
+
+SPLIT_FACTOR(60)
+SIZE(MEDIUM)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/base
+ ydb/core/tablet
+ ydb/core/tablet_flat
+ ydb/core/tx/columnshard/counters
+ yql/essentials/sql/pg_dummy
+ yql/essentials/core/arrow_kernels/request
+ ydb/core/testlib/default
+ ydb/core/tx/columnshard/test_helper
+ ydb/core/tx/columnshard/hooks/abstract
+ ydb/core/tx/columnshard/hooks/testing
+
+ yql/essentials/udfs/common/json2
+)
+
+YQL_LAST_ABI_VERSION()
+
+SRCS(
+ ut_simple.cpp
+)
+
+END()
diff --git a/ydb/core/tx/conveyor_composite/ya.make b/ydb/core/tx/conveyor_composite/ya.make
new file mode 100644
index 00000000000..efa8055bf69
--- /dev/null
+++ b/ydb/core/tx/conveyor_composite/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+)
+
+PEERDIR(
+ ydb/core/tx/conveyor_composite/service
+ ydb/core/tx/conveyor_composite/usage
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/core/tx/ya.make b/ydb/core/tx/ya.make
index b3fcead3c1e..544315c8e56 100644
--- a/ydb/core/tx/ya.make
+++ b/ydb/core/tx/ya.make
@@ -24,6 +24,8 @@ END()
RECURSE(
balance_coverage
+ conveyor
+ conveyor_composite
columnshard
coordinator
datashard