diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-05-25 10:42:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-25 10:42:32 +0300 |
commit | d617cc14c32e50f51615fe1d3a4c783289b10641 (patch) | |
tree | 855cc9a002c364649091f1f615c586b95892c22f | |
parent | e727aaf2a0bedfa37ddb8205a1f5d922b501eab0 (diff) | |
download | ydb-main.tar.gz |
41 files changed, 2468 insertions, 9 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 4e652a4f322..eb9010f205d 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -185,6 +185,7 @@ struct TKikimrEvents : TEvents { ES_FEATURE_FLAGS = 4262, ES_PRIORITY_QUEUE = 4263, ES_SOLOMON_PROVIDER = 4264, + ES_CONVEYOR_COMPOSITE = 4265, }; }; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 2baeb7f4f11..8630b12e5eb 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -654,6 +654,28 @@ message TConveyorConfig { optional double WorkersCountDouble = 5; } +message TCompositeConveyorConfig { + message TCategory { + optional string Name = 1; + optional uint64 QueueSizeLimit = 2; + } + + message TWorkerPoolCategoryLink { + optional string Category = 1; + optional double Weight = 2; + } + + message TWorkersPool { + optional double WorkersCount = 1; + optional double DefaultFractionOfThreadsCount = 2; + repeated TWorkerPoolCategoryLink Links = 3; + } + + optional bool Enabled = 1 [default = true]; + repeated TWorkersPool WorkerPools = 2; + repeated TCategory Categories = 3; +} + message TPrioritiesQueueConfig { optional bool Enabled = 1 [default = true]; optional uint32 Limit = 2 [default = 32]; @@ -1116,7 +1138,7 @@ message TQueryServiceConfig { optional uint32 QueryTimeoutDefaultSeconds = 19 [default = 1800]; optional bool EnableMatchRecognize = 20 [default = false]; repeated string AvailableExternalDataSources = 22; // Ignored if AllExternalDataSourcesAreAvailable is true - optional bool AllExternalDataSourcesAreAvailable = 23 [default = true]; + optional bool AllExternalDataSourcesAreAvailable = 23 [default = true]; } // Config describes immediate controls and allows @@ -2375,6 +2397,7 @@ message TAppConfig { optional NKikimrBlobStorage.TYamlConfig StoredConfigYaml = 105; optional string StartupConfigYaml = 107; optional string StartupStorageYaml = 108; + optional TCompositeConveyorConfig CompositeConveyorConfig = 109; } message TYdbVersion { diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp index 07d4e6fe709..c58ae15bed8 100644 --- a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp +++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp @@ -651,6 +651,7 @@ void TPortionDataAccessor::FullValidation() const { TBlobRange::Validate(PortionInfo->GetMeta().GetBlobIds(), i.GetBlobRange()).Validate(); blobIdxs.emplace(i.GetBlobRange().GetBlobIdxVerified()); } + AFL_VERIFY(GetRecordsVerified().size()); for (auto&& i : GetIndexesVerified()) { if (auto bRange = i.GetBlobRangeOptional()) { TBlobRange::Validate(PortionInfo->GetMeta().GetBlobIds(), *bRange).Validate(); diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp b/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp index dfa57b5bee8..329efccbd77 100644 --- a/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp +++ b/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp @@ -77,4 +77,4 @@ void TWriteAggregation::Flush(const ui64 tabletId) { } } -} // namespace NKikimr::NColumnShard::NWritingPortions +} // namespace NKikimr::NOlap::NWritingPortions diff --git a/ydb/core/tx/conveyor/usage/abstract.h b/ydb/core/tx/conveyor/usage/abstract.h index 909136d4263..c9fa9da33bc 100644 --- a/ydb/core/tx/conveyor/usage/abstract.h +++ b/ydb/core/tx/conveyor/usage/abstract.h @@ -19,6 +19,14 @@ public: NMonitoring::TDynamicCounters::TCounterPtr Success; NMonitoring::TDynamicCounters::TCounterPtr SuccessDuration; + TTaskSignals(const NColumnShard::TCommonCountersOwner& baseObject, const TString& taskClassIdentifier) + : TBase(baseObject, "task_class_name", taskClassIdentifier) { + Fails = TBase::GetDeriviative("Fails"); + FailsDuration = TBase::GetDeriviative("FailsDuration"); + Success = TBase::GetDeriviative("Success"); + SuccessDuration = TBase::GetDeriviative("SuccessDuration"); + } + TTaskSignals(const TString& moduleId, const TString& taskClassIdentifier, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals = nullptr) : TBase(moduleId, baseSignals) { DeepSubGroup("task_class", taskClassIdentifier); diff --git a/ydb/core/tx/conveyor/usage/service.h b/ydb/core/tx/conveyor/usage/service.h index 2acd2ede377..a943ec29b62 100644 --- a/ydb/core/tx/conveyor/usage/service.h +++ b/ydb/core/tx/conveyor/usage/service.h @@ -1,20 +1,21 @@ #pragma once #include "config.h" -#include <ydb/library/actors/core/actorid.h> -#include <ydb/library/actors/core/actor.h> + #include <ydb/core/tx/conveyor/service/service.h> #include <ydb/core/tx/conveyor/usage/events.h> +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/actorid.h> + namespace NKikimr::NConveyor { class TAsyncTaskExecutor: public TActorBootstrapped<TAsyncTaskExecutor> { private: const std::shared_ptr<ITask> Task; + public: TAsyncTaskExecutor(const std::shared_ptr<ITask>& task) - : Task(task) - { - + : Task(task) { } void Bootstrap() { @@ -23,6 +24,12 @@ public: } }; +enum class ESpecialTaskProcesses { + Insert = 1, + Compaction = 2, + Normalizer = 3 +}; + template <class TConveyorPolicy> class TServiceOperatorImpl { private: @@ -35,11 +42,15 @@ private: Y_ABORT_UNLESS(TConveyorPolicy::Name.size() == 4); return TConveyorPolicy::Name; } + public: static void AsyncTaskToExecute(const std::shared_ptr<ITask>& task) { auto& context = NActors::TActorContext::AsActorContext(); context.Register(new TAsyncTaskExecutor(task)); } + static bool SendTaskToExecute(const std::shared_ptr<ITask>& task, const ESpecialTaskProcesses processType) { + return SendTaskToExecute(task, (ui64)processType); + } static bool SendTaskToExecute(const std::shared_ptr<ITask>& task, const ui64 processId = 0) { if (TSelf::IsEnabled() && NActors::TlsActivationContext) { auto& context = NActors::TActorContext::AsActorContext(); @@ -71,7 +82,6 @@ public: return TProcessGuard(externalProcessId, {}); } } - }; class TScanConveyorPolicy { @@ -96,4 +106,4 @@ using TScanServiceOperator = TServiceOperatorImpl<TScanConveyorPolicy>; using TCompServiceOperator = TServiceOperatorImpl<TCompConveyorPolicy>; using TInsertServiceOperator = TServiceOperatorImpl<TInsertConveyorPolicy>; -} +} // namespace NKikimr::NConveyor diff --git a/ydb/core/tx/conveyor/ya.make b/ydb/core/tx/conveyor/ya.make new file mode 100644 index 00000000000..e5c415537d4 --- /dev/null +++ b/ydb/core/tx/conveyor/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( +) + +PEERDIR( + ydb/core/tx/conveyor/service + ydb/core/tx/conveyor/usage +) + +END() diff --git a/ydb/core/tx/conveyor_composite/service/category.cpp b/ydb/core/tx/conveyor_composite/service/category.cpp new file mode 100644 index 00000000000..4c87ae85d0d --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/category.cpp @@ -0,0 +1,81 @@ +#include "category.h" + +namespace NKikimr::NConveyorComposite { + +bool TProcessCategory::HasTasks() const { + for (auto&& i : Scopes) { + if (i.second->HasTasks()) { + return true; + } + } + return false; +} + +void TProcessCategory::DoQuant(const TMonotonic newStart) { + CPUUsage->Cut(newStart); + for (auto&& i : Scopes) { + i.second->DoQuant(newStart); + } +} + +TWorkerTask TProcessCategory::ExtractTaskWithPrediction() { + std::shared_ptr<TProcessScope> scopeMin; + TDuration dMin; + for (auto&& [_, scope] : Scopes) { + if (!scope->HasTasks()) { + continue; + } + const TDuration d = scope->GetCPUUsage()->CalcWeight(scope->GetWeight()); + if (!scopeMin || d < dMin) { + dMin = d; + scopeMin = scope; + } + } + AFL_VERIFY(scopeMin); + return scopeMin->ExtractTaskWithPrediction(); +} + +TProcessScope& TProcessCategory::MutableProcessScope(const TString& scopeName) { + auto it = Scopes.find(scopeName); + AFL_VERIFY(it != Scopes.end())("cat", GetCategory())("scope", scopeName); + return *it->second; +} + +TProcessScope* TProcessCategory::MutableProcessScopeOptional(const TString& scopeName) { + auto it = Scopes.find(scopeName); + if (it != Scopes.end()) { + return it->second.get(); + } else { + return nullptr; + } +} + +TProcessScope& TProcessCategory::RegisterScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits) { + TCPUGroup::TPtr cpuGroup = std::make_shared<TCPUGroup>(processCpuLimits.GetCPUGroupThreadsLimitDef(256)); + auto info = Scopes.emplace(scopeId, std::make_shared<TProcessScope>(std::move(cpuGroup), CPUUsage)); + AFL_VERIFY(info.second); + return *info.first->second; +} + +TProcessScope& TProcessCategory::UpdateScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits) { + auto& scope = MutableProcessScope(scopeId); + scope.UpdateLimits(processCpuLimits); + return scope; +} + +TProcessScope& TProcessCategory::UpsertScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits) { + if (Scopes.contains(scopeId)) { + return UpdateScope(scopeId, processCpuLimits); + } else { + return RegisterScope(scopeId, processCpuLimits); + } +} + +void TProcessCategory::UnregisterScope(const TString& name) { + auto it = Scopes.find(name); + AFL_VERIFY(it != Scopes.end()); + AFL_VERIFY(!it->second->GetProcessesCount()); + Scopes.erase(it); +} + +} diff --git a/ydb/core/tx/conveyor_composite/service/category.h b/ydb/core/tx/conveyor_composite/service/category.h new file mode 100644 index 00000000000..78c8f9b6b61 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/category.h @@ -0,0 +1,50 @@ +#pragma once +#include "common.h" +#include "counters.h" +#include "scope.h" +#include "worker.h" + +#include <ydb/library/accessor/positive_integer.h> + +namespace NKikimr::NConveyorComposite { + +class TProcessCategory: public TNonCopyable { +private: + const ESpecialTaskCategory Category; + std::shared_ptr<TCPUUsage> CPUUsage = std::make_shared<TCPUUsage>(nullptr); + TPositiveControlInteger WaitingTasksCount; + YDB_READONLY_DEF(std::shared_ptr<TCategorySignals>, Counters); + THashMap<TString, std::shared_ptr<TProcessScope>> Scopes; + const NConfig::TCategory Config; + +public: + TProcessCategory(const NConfig::TCategory& config, TCounters& counters) + : Category(config.GetCategory()) + , Config(config) { + Counters = counters.GetCategorySignals(Category); + } + + ESpecialTaskCategory GetCategory() const { + return Category; + } + + void PutTaskResult(TWorkerTaskResult&& result) { + const TString id = result.GetScopeId(); + if (TProcessScope* scope = MutableProcessScopeOptional(id)) { + scope->PutTaskResult(std::move(result)); + } + } + + bool HasTasks() const; + void DoQuant(const TMonotonic newStart); + TWorkerTask ExtractTaskWithPrediction(); + TProcessScope& MutableProcessScope(const TString& scopeName); + TProcessScope* MutableProcessScopeOptional(const TString& scopeName); + TProcessScope& RegisterScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits); + TProcessScope& UpsertScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits); + + TProcessScope& UpdateScope(const TString& scopeId, const TCPULimitsConfig& processCpuLimits); + void UnregisterScope(const TString& name); +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/common.cpp b/ydb/core/tx/conveyor_composite/service/common.cpp new file mode 100644 index 00000000000..46a9129dc63 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/common.cpp @@ -0,0 +1,37 @@ +#include "common.h" + +#include <ydb/library/actors/core/log.h> + +namespace NKikimr::NConveyorComposite { + +void TCPUUsage::Exchange(const TDuration predicted, const TMonotonic start, const TMonotonic finish) { + Usage.emplace_back(TTaskCPUUsage(start, finish)); + AFL_VERIFY(predicted <= PredictedDuration)("predicted_delta", predicted)("predicted_sum", PredictedDuration); + Duration += Usage.back().GetDuration(); + PredictedDuration -= predicted; + if (Parent) { + Parent->Exchange(predicted, start, finish); + } +} + +void TCPUUsage::Cut(const TMonotonic start) { + ui32 idx = 0; + while (idx < Usage.size()) { + AFL_VERIFY(Usage[idx].GetDuration() <= Duration); + Duration -= Usage[idx].GetDuration(); + if (Usage[idx].GetFinish() <= start) { + std::swap(Usage[idx], Usage.back()); + Usage.pop_back(); + } else { + Usage[idx].Cut(start); + Duration += Usage[idx].GetDuration(); + ++idx; + } + } +} + +TCPUGroup::~TCPUGroup() { + AFL_VERIFY(ProcessesCount == 0); +} + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/common.h b/ydb/core/tx/conveyor_composite/service/common.h new file mode 100644 index 00000000000..b35e49a17f4 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/common.h @@ -0,0 +1,127 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/library/accessor/positive_integer.h> +#include <ydb/library/actors/core/monotonic.h> +#include <ydb/library/signals/owner.h> + +#include <util/datetime/base.h> + +namespace NKikimr::NConveyorComposite { + +class TTaskCPUUsage { +private: + YDB_READONLY_DEF(TMonotonic, Start); + YDB_READONLY_DEF(TMonotonic, Finish); + YDB_READONLY_DEF(TDuration, Duration); + +public: + void Cut(const TMonotonic start) { + AFL_VERIFY(start < Finish); + if (Start <= start) { + Start = start; + } + } + + TTaskCPUUsage(const TMonotonic start, const TMonotonic finish) + : Start(start) + , Finish(finish) + , Duration(finish - start) { + } +}; + +template <class T> +class TAverageCalcer { +private: + const ui32 Count = 100; + std::deque<T> Values; + T Sum = T(); + +public: + TAverageCalcer(const ui32 count = 100) + : Count(count) { + } + + void Add(const T value) { + Values.emplace_back(value); + Sum += value; + if (Values.size() > Count) { + Sum -= Values.front(); + Values.pop_front(); + } + } + + T GetValue() const { + return Values.size() ? (Sum / Values.size()) : T(); + } +}; + +class TCPUUsage { +private: + std::deque<TTaskCPUUsage> Usage; + YDB_READONLY_DEF(TDuration, Duration); + YDB_READONLY_DEF(TDuration, PredictedDuration); + std::shared_ptr<TCPUUsage> Parent; + +public: + TCPUUsage(const std::shared_ptr<TCPUUsage>& parent) + : Parent(parent) { + } + + TDuration CalcWeight(const double w) const { + if (w <= 0) { + return TDuration::Max(); + } else { + return (Duration + PredictedDuration) * w; + } + } + + void AddPredicted(const TDuration d) { + PredictedDuration += d; + if (Parent) { + Parent->AddPredicted(d); + } + } + + void AddUsage(const TTaskCPUUsage& usage) { + Usage.emplace_back(usage); + Duration += usage.GetDuration(); + if (Parent) { + Parent->AddUsage(usage); + } + } + + void Exchange(const TDuration predicted, const TMonotonic start, const TMonotonic finish); + void Cut(const TMonotonic start); +}; + +class TCPUGroup { + YDB_ACCESSOR_DEF(double, CPUThreadsLimit); + YDB_ACCESSOR(double, Weight, 1); + TPositiveControlInteger ProcessesCount; + +public: + using TPtr = std::shared_ptr<TCPUGroup>; + + TCPUGroup(const double cpuThreadsLimit, const double weight = 1) + : CPUThreadsLimit(cpuThreadsLimit) + , Weight(weight) + { + } + + ~TCPUGroup(); + + ui32 GetProcessesCount() const { + return ProcessesCount.Val(); + } + + bool DecProcesses() { + --ProcessesCount; + return ProcessesCount == 0; + } + + void IncProcesses() { + ++ProcessesCount; + } +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/counters.cpp b/ydb/core/tx/conveyor_composite/service/counters.cpp new file mode 100644 index 00000000000..59e2f702088 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/counters.cpp @@ -0,0 +1,31 @@ +#include "counters.h" + +namespace NKikimr::NConveyorComposite { + +TCounters::TCounters(const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals) + : TBase("CompositeConveyor/" + conveyorName, baseSignals) + , ProcessesCount(TBase::GetValue("Processes/Count")) + , WaitingQueueSize(TBase::GetValue("WaitingQueueSize")) + , WaitingQueueSizeLimit(TBase::GetValue("WaitingQueueSizeLimit")) + , AvailableWorkersCount(TBase::GetValue("AvailableWorkersCount")) + , WorkersCountLimit(TBase::GetValue("WorkersCountLimit")) + , AmountCPULimit(TBase::GetValue("AmountCPULimit")) + , IncomingRate(TBase::GetDeriviative("Incoming")) + , SolutionsRate(TBase::GetDeriviative("Solved")) + , OverlimitRate(TBase::GetDeriviative("Overlimit")) + , WaitWorkerRate(TBase::GetDeriviative("WaitWorker")) + , UseWorkerRate(TBase::GetDeriviative("UseWorker")) + , ChangeCPULimitRate(TBase::GetDeriviative("ChangeCPULimit")) + , WaitingHistogram(TBase::GetHistogram("Waiting/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50))) + , PackHistogram(TBase::GetHistogram("ExecutionPack/Count", NMonitoring::LinearHistogram(25, 1, 1))) + , PackExecuteHistogram(TBase::GetHistogram("PackExecute/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50))) + , TaskExecuteHistogram(TBase::GetHistogram("TaskExecute/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50))) + , SendBackHistogram(TBase::GetHistogram("SendBack/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50))) + , SendFwdHistogram(TBase::GetHistogram("SendForward/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50))) + , ReceiveTaskHistogram(TBase::GetHistogram("ReceiveTask/Duration/Us", NMonitoring::ExponentialHistogram(25, 2, 50))) + , SendBackDuration(TBase::GetDeriviative("SendBack/Duration/Us")) + , SendFwdDuration(TBase::GetDeriviative("SendForward/Duration/Us")) + , ExecuteDuration(TBase::GetDeriviative("Execute/Duration/Us")) { +} + +} diff --git a/ydb/core/tx/conveyor_composite/service/counters.h b/ydb/core/tx/conveyor_composite/service/counters.h new file mode 100644 index 00000000000..e6b7f38fc8f --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/counters.h @@ -0,0 +1,79 @@ +#pragma once +#include <ydb/core/tx/conveyor/usage/abstract.h> +#include <ydb/core/tx/conveyor_composite/usage/common.h> + +#include <ydb/library/signals/owner.h> + +namespace NKikimr::NConveyorComposite { + +using TTaskSignals = NConveyor::TTaskSignals; + +class TCategorySignals: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + THashMap<TString, std::shared_ptr<TTaskSignals>> TaskClassSignals; + YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert); + +public: + TCategorySignals(NColumnShard::TCommonCountersOwner& base, const ESpecialTaskCategory cat) + : TBase(base, "category", ::ToString(cat)) + , Category(cat) { + } + + std::shared_ptr<TTaskSignals> GetTaskSignals(const TString& taskClassName) { + auto it = TaskClassSignals.find(taskClassName); + if (it == TaskClassSignals.end()) { + it = TaskClassSignals.emplace(taskClassName, std::make_shared<TTaskSignals>(*this, taskClassName)).first; + } + return it->second; + } +}; + +class TCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + THashMap<ESpecialTaskCategory, std::shared_ptr<TCategorySignals>> CategorySignals; + +public: + const ::NMonitoring::TDynamicCounters::TCounterPtr ProcessesCount; + + const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSize; + const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSizeLimit; + + const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressSize; + + const ::NMonitoring::TDynamicCounters::TCounterPtr AvailableWorkersCount; + const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCountLimit; + const ::NMonitoring::TDynamicCounters::TCounterPtr AmountCPULimit; + + const ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRate; + const ::NMonitoring::TDynamicCounters::TCounterPtr SolutionsRate; + const ::NMonitoring::TDynamicCounters::TCounterPtr OverlimitRate; + const ::NMonitoring::TDynamicCounters::TCounterPtr WaitWorkerRate; + const ::NMonitoring::TDynamicCounters::TCounterPtr UseWorkerRate; + const ::NMonitoring::TDynamicCounters::TCounterPtr ChangeCPULimitRate; + + const ::NMonitoring::THistogramPtr WaitingHistogram; + const ::NMonitoring::THistogramPtr PackHistogram; + const ::NMonitoring::THistogramPtr PackExecuteHistogram; + const ::NMonitoring::THistogramPtr TaskExecuteHistogram; + const ::NMonitoring::THistogramPtr SendBackHistogram; + const ::NMonitoring::THistogramPtr SendFwdHistogram; + const ::NMonitoring::THistogramPtr ReceiveTaskHistogram; + + const ::NMonitoring::TDynamicCounters::TCounterPtr SendBackDuration; + const ::NMonitoring::TDynamicCounters::TCounterPtr SendFwdDuration; + const ::NMonitoring::TDynamicCounters::TCounterPtr ExecuteDuration; + + std::shared_ptr<TCategorySignals> GetCategorySignals(const ESpecialTaskCategory cat) { + auto it = CategorySignals.find(cat); + if (it == CategorySignals.end()) { + it = CategorySignals.emplace(cat, std::make_shared<TCategorySignals>(*this, cat)).first; + } + return it->second; + } + + TCounters(const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals); +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/events.cpp b/ydb/core/tx/conveyor_composite/service/events.cpp new file mode 100644 index 00000000000..68c63c027ca --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/events.cpp @@ -0,0 +1,23 @@ +#include "events.h" + +#include <ydb/library/actors/core/log.h> + +namespace NKikimr::NConveyorComposite { + +TEvInternal::TEvTaskProcessedResult::TEvTaskProcessedResult( + std::vector<TWorkerTaskResult>&& results, const TDuration forwardSendDuration, const ui64 workerIdx, const ui64 workersPoolId) + : ForwardSendDuration(forwardSendDuration) + , Results(std::move(results)) + , WorkerIdx(workerIdx) + , WorkersPoolId(workersPoolId) { + AFL_VERIFY(Results.size()); +} + +TWorkerTaskResult::TWorkerTaskResult(const TWorkerTaskContext& context, const TMonotonic start, const TMonotonic finish) + : TBase(context) + , Start(start) + , Finish(finish) { + AFL_VERIFY(Start <= Finish); +} + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/events.h b/ydb/core/tx/conveyor_composite/service/events.h new file mode 100644 index 00000000000..dcafb80c675 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/events.h @@ -0,0 +1,117 @@ +#pragma once +#include "counters.h" + +#include <ydb/core/tx/conveyor_composite/usage/common.h> + +#include <ydb/library/accessor/accessor.h> +#include <ydb/library/actors/core/events.h> +#include <ydb/library/actors/core/monotonic.h> +#include <ydb/library/conclusion/result.h> + +namespace NKikimr::NConveyorComposite { + +class TWorkerTaskContext { +private: + YDB_READONLY(TMonotonic, CreateInstant, TMonotonic::Now()); + YDB_READONLY_DEF(TDuration, PredictedDuration); + YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert); + YDB_READONLY_DEF(TString, ScopeId); + YDB_READONLY(ui64, ProcessId, 0); + +public: + TWorkerTaskContext(const TDuration prediction, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId) + : PredictedDuration(prediction) + , Category(category) + , ScopeId(scopeId) + , ProcessId(processId) { + } +}; + +class TWorkerTask; + +class TWorkerTaskResult: public TWorkerTaskContext { +private: + using TBase = TWorkerTaskContext; + YDB_READONLY_DEF(TMonotonic, Start); + YDB_READONLY_DEF(TMonotonic, Finish); + + TWorkerTaskResult(const TWorkerTaskContext& context, const TMonotonic start, const TMonotonic finish); + friend class TWorkerTask; + +public: + TDuration GetDuration() const { + return Finish - Start; + } + +}; + +class TWorkerTask: public TWorkerTaskContext { +private: + using TBase = TWorkerTaskContext; + YDB_READONLY_DEF(ITask::TPtr, Task); + YDB_READONLY_DEF(std::shared_ptr<TTaskSignals>, TaskSignals); + +public: + TWorkerTaskResult GetResult(const TMonotonic start, const TMonotonic finish) const { + return TWorkerTaskResult(*this, start, finish); + } + + TWorkerTask(const ITask::TPtr& task, const TDuration prediction, const ESpecialTaskCategory category, const TString& scopeId, + const std::shared_ptr<TTaskSignals>& taskSignals, const ui64 processId) + : TBase(prediction, category, scopeId, processId) + , Task(task) + , TaskSignals(taskSignals) { + Y_ABORT_UNLESS(task); + } + + bool operator<(const TWorkerTask& wTask) const { + return Task->GetPriority() < wTask.Task->GetPriority(); + } +}; + +struct TEvInternal { + enum EEv { + EvNewTask = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvTaskProcessedResult, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expected EvEnd < EventSpaceEnd"); + + class TEvNewTask: public NActors::TEventLocal<TEvNewTask, EvNewTask> { + private: + std::vector<TWorkerTask> Tasks; + YDB_READONLY(TMonotonic, ConstructInstant, TMonotonic::Now()); + + public: + TEvNewTask() = default; + + std::vector<TWorkerTask>&& ExtractTasks() { + return std::move(Tasks); + } + + explicit TEvNewTask(std::vector<TWorkerTask>&& tasks) + : Tasks(std::move(tasks)) { + } + }; + + class TEvTaskProcessedResult: public NActors::TEventLocal<TEvTaskProcessedResult, EvTaskProcessedResult> { + private: + using TBase = TConclusion<ITask::TPtr>; + YDB_READONLY_DEF(TDuration, ForwardSendDuration); + std::vector<TWorkerTaskResult> Results; + YDB_READONLY(TMonotonic, ConstructInstant, TMonotonic::Now()); + YDB_READONLY(ui64, WorkerIdx, 0); + YDB_READONLY(ui64, WorkersPoolId, 0); + + public: + std::vector<TWorkerTaskResult>&& DetachResults() { + return std::move(Results); + } + + TEvTaskProcessedResult( + std::vector<TWorkerTaskResult>&& results, const TDuration forwardSendDuration, const ui64 workerIdx, const ui64 workersPoolId); + }; +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/manager.cpp b/ydb/core/tx/conveyor_composite/service/manager.cpp new file mode 100644 index 00000000000..d62b06ae24a --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/manager.cpp @@ -0,0 +1,4 @@ +#include "category.h" + +namespace NKikimr::NConveyorComposite { +} diff --git a/ydb/core/tx/conveyor_composite/service/manager.h b/ydb/core/tx/conveyor_composite/service/manager.h new file mode 100644 index 00000000000..30c0cc7666f --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/manager.h @@ -0,0 +1,78 @@ +#pragma once +#include "category.h" +#include "workers_pool.h" + +#include <ydb/core/tx/conveyor_composite/usage/config.h> + +namespace NKikimr::NConveyorComposite { +class TTasksManager { +private: + const TString ConveyorName; + std::vector<std::shared_ptr<TWorkersPool>> WorkerPools; + std::vector<std::shared_ptr<TProcessCategory>> Categories; + const NActors::TActorId DistributorActorId; + const NConfig::TConfig Config; + TCounters& Counters; + +public: + TString DebugString() const { + TStringBuilder sb; + sb << "{"; + sb << ConveyorName << ":"; + for (auto&& wp : WorkerPools) { + sb << wp->GetMaxWorkerThreads() << ","; + } + sb << ";"; + sb << "}"; + return sb; + } + + void DoQuant(const TMonotonic newStart) { + for (auto&& c : Categories) { + c->DoQuant(newStart); + } + } + + TTasksManager(const TString& convName, const NConfig::TConfig& config, const NActors::TActorId distributorActorId, TCounters& counters) + : ConveyorName(convName) + , DistributorActorId(distributorActorId) + , Config(config) + , Counters(counters) + { + for (auto&& i : GetEnumAllValues<ESpecialTaskCategory>()) { + Categories.emplace_back(std::make_shared<TProcessCategory>(Config.GetCategoryConfig(i), Counters)); + } + for (auto&& i : Config.GetWorkerPools()) { + WorkerPools.emplace_back(std::make_shared<TWorkersPool>(ConveyorName, distributorActorId, i, Counters, Categories)); + } + } + + TWorkersPool& MutableWorkersPool(const ui32 workersPoolId) { + AFL_VERIFY(workersPoolId < WorkerPools.size()); + return *WorkerPools[workersPoolId]; + } + + [[nodiscard]] bool DrainTasks() { + bool result = false; + for (auto&& i : WorkerPools) { + if (i->DrainTasks()) { + result = true; + } + } + return result; + } + + TProcessCategory& MutableCategoryVerified(const ESpecialTaskCategory category) { + AFL_VERIFY((ui64)category < Categories.size()); + AFL_VERIFY(!!Categories[(ui64)category]); + return *Categories[(ui64)category]; + } + + const TProcessCategory& GetCategoryVerified(const ESpecialTaskCategory category) const { + AFL_VERIFY((ui64)category < Categories.size()); + AFL_VERIFY(!!Categories[(ui64)category]); + return *Categories[(ui64)category]; + } +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/process.cpp b/ydb/core/tx/conveyor_composite/service/process.cpp new file mode 100644 index 00000000000..b4bf9a8611f --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/process.cpp @@ -0,0 +1,5 @@ +#include "process.h" + +namespace NKikimr::NConveyorComposite { + +} diff --git a/ydb/core/tx/conveyor_composite/service/process.h b/ydb/core/tx/conveyor_composite/service/process.h new file mode 100644 index 00000000000..cbc58bb4de4 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/process.h @@ -0,0 +1,122 @@ +#pragma once +#include "common.h" +#include "worker.h" + +#include <ydb/core/tx/conveyor_composite/usage/config.h> +#include <ydb/core/tx/conveyor_composite/usage/events.h> + +#include <ydb/library/accessor/positive_integer.h> +#include <ydb/library/actors/core/actor_bootstrapped.h> +#include <ydb/library/actors/core/log.h> +#include <ydb/library/signals/owner.h> + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <queue> + +namespace NKikimr::NConveyorComposite { + +class TDequePriorityFIFO { +private: + std::map<ui32, std::deque<TWorkerTask>> Tasks; + ui32 Size = 0; + +public: + void push(const TWorkerTask& task) { + Tasks[(ui32)task.GetTask()->GetPriority()].emplace_back(task); + ++Size; + } + TWorkerTask pop() { + Y_ABORT_UNLESS(Size); + auto result = std::move(Tasks.rbegin()->second.front()); + Tasks.rbegin()->second.pop_front(); + if (Tasks.rbegin()->second.size() == 0) { + Tasks.erase(--Tasks.end()); + } + --Size; + return result; + } + ui32 size() const { + return Size; + } +}; + +class TProcessOrdered { +private: + YDB_READONLY(ui64, ProcessId, 0); + YDB_READONLY(ui64, CPUTime, 0); + +public: + TProcessOrdered(const ui64 processId, const ui64 cpuTime) + : ProcessId(processId) + , CPUTime(cpuTime) { + } + + bool operator<(const TProcessOrdered& item) const { + if (CPUTime < item.CPUTime) { + return true; + } + if (item.CPUTime < CPUTime) { + return false; + } + return ProcessId < item.ProcessId; + } +}; + +class TProcess: public TMoveOnly { +private: + YDB_READONLY(ui64, ProcessId, 0); + YDB_READONLY_DEF(std::shared_ptr<TCPUUsage>, CPUUsage); + YDB_ACCESSOR_DEF(TDequePriorityFIFO, Tasks); + TAverageCalcer<TDuration> AverageTaskDuration; + ui32 LinksCount = 0; + +public: + void DoQuant(const TMonotonic newStart) { + CPUUsage->Cut(newStart); + } + + bool HasTasks() const { + return Tasks.size(); + } + + TWorkerTask ExtractTaskWithPrediction() { + auto result = Tasks.pop(); + CPUUsage->AddPredicted(result.GetPredictedDuration()); + return result; + } + + void PutTaskResult(TWorkerTaskResult&& result) { + CPUUsage->Exchange(result.GetPredictedDuration(), result.GetStart(), result.GetFinish()); + AverageTaskDuration.Add(result.GetDuration()); + } + + [[nodiscard]] bool DecRegistration() { + AFL_VERIFY(LinksCount); + --LinksCount; + return LinksCount == 0; + } + + double GetWeight() const { + return 1.0; + } + + void IncRegistration() { + ++LinksCount; + } + + TProcess(const ui64 processId, const std::shared_ptr<TCPUUsage>& scopeUsage) + : ProcessId(processId) + { + CPUUsage = std::make_shared<TCPUUsage>(scopeUsage); + IncRegistration(); + } + + void RegisterTask(const std::shared_ptr<ITask>& task, const TString& scopeId, const std::shared_ptr<TCategorySignals>& signals) { + TWorkerTask wTask(task, AverageTaskDuration.GetValue(), signals->GetCategory(), scopeId, + signals->GetTaskSignals(task->GetTaskClassIdentifier()), ProcessId); + Tasks.push(std::move(wTask)); + } +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/scope.cpp b/ydb/core/tx/conveyor_composite/service/scope.cpp new file mode 100644 index 00000000000..0613ee69e20 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/scope.cpp @@ -0,0 +1,38 @@ +#include "scope.h" + +namespace NKikimr::NConveyorComposite { + +bool TProcessScope::HasTasks() const { + for (auto&& i : Processes) { + if (i.second.HasTasks()) { + return true; + } + } + return false; +} + +TWorkerTask TProcessScope::ExtractTaskWithPrediction() { + TProcess* pMin = nullptr; + TDuration dMin = TDuration::Max(); + for (auto&& [_, p] : Processes) { + if (!p.HasTasks()) { + continue; + } + const TDuration d = p.GetCPUUsage()->CalcWeight(p.GetWeight()); + if (!pMin || d < dMin) { + pMin = &p; + dMin = d; + } + } + AFL_VERIFY(pMin)("size", Processes.size()); + return pMin->ExtractTaskWithPrediction(); +} + +void TProcessScope::DoQuant(const TMonotonic newStart) { + CPUUsage->Cut(newStart); + for (auto&& i : Processes) { + i.second.DoQuant(newStart); + } +} + +} diff --git a/ydb/core/tx/conveyor_composite/service/scope.h b/ydb/core/tx/conveyor_composite/service/scope.h new file mode 100644 index 00000000000..b95a15dad20 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/scope.h @@ -0,0 +1,71 @@ +#pragma once +#include "common.h" +#include "process.h" + +namespace NKikimr::NConveyorComposite { + +class TProcessScope: public TNonCopyable { +private: + YDB_READONLY_DEF(std::shared_ptr<TCPUUsage>, CPUUsage); + TCPUGroup::TPtr ScopeLimits; + THashMap<ui64, TProcess> Processes; + +public: + double GetWeight() const { + return ScopeLimits->GetWeight(); + } + + ui32 GetProcessesCount() const { + return Processes.size(); + } + bool HasTasks() const; + + TWorkerTask ExtractTaskWithPrediction(); + + void DoQuant(const TMonotonic newStart); + + void UpdateLimits(const TCPULimitsConfig& processCpuLimits) { + ScopeLimits->SetCPUThreadsLimit(processCpuLimits.GetCPUGroupThreadsLimitDef(256)); + ScopeLimits->SetWeight(processCpuLimits.GetWeight()); + } + + void PutTaskResult(TWorkerTaskResult&& result) { + const ui64 id = result.GetProcessId(); + if (auto* process = MutableProcessOptional(id)) { + process->PutTaskResult(std::move(result)); + } + } + + TProcessScope(TCPUGroup::TPtr&& limits, const std::shared_ptr<TCPUUsage>& categoryScope) + : CPUUsage(std::make_shared<TCPUUsage>(categoryScope)) + , ScopeLimits(std::move(limits)) { + } + + TProcess& MutableProcessVerified(const ui64 processId) { + auto it = Processes.find(processId); + AFL_VERIFY(it != Processes.end()); + return it->second; + } + + TProcess* MutableProcessOptional(const ui64 processId) { + auto it = Processes.find(processId); + if (it != Processes.end()) { + return &it->second; + } else { + return nullptr; + } + } + + void RegisterProcess(const ui64 processId) { + TProcess process(processId, CPUUsage); + AFL_VERIFY(Processes.emplace(processId, std::move(process)).second); + ScopeLimits->IncProcesses(); + } + + bool UnregisterProcess(const ui64 processId) { + AFL_VERIFY(Processes.erase(processId)); + return ScopeLimits->DecProcesses(); + } +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/service.cpp b/ydb/core/tx/conveyor_composite/service/service.cpp new file mode 100644 index 00000000000..1195f7c1a08 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/service.cpp @@ -0,0 +1,69 @@ +#include "manager.h" +#include "service.h" + +#include <ydb/core/kqp/query_data/kqp_predictor.h> +#include <ydb/core/tx/conveyor_composite/usage/service.h> + +namespace NKikimr::NConveyorComposite { + +TDistributor::TDistributor( + const NConfig::TConfig& config, const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) + : Config(config) + , ConveyorName(conveyorName) + , Counters(ConveyorName, conveyorSignals) { +} + +TDistributor::~TDistributor() { +} + +void TDistributor::Bootstrap() { + Manager = std::make_unique<TTasksManager>(ConveyorName, Config, SelfId(), Counters); + AFL_NOTICE(NKikimrServices::TX_CONVEYOR)("name", ConveyorName)("action", "conveyor_registered")("config", Config.DebugString())( + "actor_id", SelfId())("manager", Manager->DebugString()); + Become(&TDistributor::StateMain); + TBase::Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(1)); +} + +void TDistributor::HandleMain(NActors::TEvents::TEvWakeup::TPtr& evExt) { + if (evExt->Get()->Tag == 1) { + Manager->DoQuant(TMonotonic::Now()); + TBase::Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(1)); + } +} + +void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& evExt) { + TWorkersPool& workersPool = Manager->MutableWorkersPool(evExt->Get()->GetWorkersPoolId()); + workersPool.AddDeliveryDuration(evExt->Get()->GetForwardSendDuration() + (TMonotonic::Now() - evExt->Get()->GetConstructInstant())); + workersPool.ReleaseWorker(evExt->Get()->GetWorkerIdx()); + for (auto&& i : evExt->Get()->DetachResults()) { + workersPool.PutTaskResult(std::move(i)); + } + if (workersPool.HasTasks()) { + AFL_VERIFY(workersPool.DrainTasks()); + } +} + +void TDistributor::HandleMain(TEvExecution::TEvRegisterProcess::TPtr& ev) { + Manager->MutableCategoryVerified(ev->Get()->GetCategory()) + .UpsertScope(ev->Get()->GetScopeId(), ev->Get()->GetCPULimits()) + .RegisterProcess(ev->Get()->GetProcessId()); +} + +void TDistributor::HandleMain(TEvExecution::TEvUnregisterProcess::TPtr& ev) { + auto* evData = ev->Get(); + if (Manager->MutableCategoryVerified(evData->GetCategory()) + .MutableProcessScope(evData->GetScopeId()) + .UnregisterProcess(evData->GetProcessId())) { + Manager->MutableCategoryVerified(evData->GetCategory()).UnregisterScope(evData->GetScopeId()); + } +} + +void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) { + auto& cat = Manager->MutableCategoryVerified(ev->Get()->GetCategory()); + cat.MutableProcessScope(ev->Get()->GetScopeId()) + .MutableProcessVerified(ev->Get()->GetProcessId()) + .RegisterTask(ev->Get()->GetTask(), ev->Get()->GetScopeId(), cat.GetCounters()); + Y_UNUSED(Manager->DrainTasks()); +} + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/service.h b/ydb/core/tx/conveyor_composite/service/service.h new file mode 100644 index 00000000000..159fff0e416 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/service.h @@ -0,0 +1,67 @@ +#pragma once +#include "counters.h" +#include "events.h" + +#include <ydb/core/tx/conveyor_composite/usage/config.h> +#include <ydb/core/tx/conveyor_composite/usage/events.h> + +#include <ydb/library/accessor/positive_integer.h> +#include <ydb/library/actors/core/actor_bootstrapped.h> +#include <ydb/library/actors/core/log.h> +#include <ydb/library/signals/owner.h> + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <queue> + +namespace NKikimr::NConveyorComposite { +class TTasksManager; +class TDistributor: public TActorBootstrapped<TDistributor> { +private: + using TBase = TActorBootstrapped<TDistributor>; + const NConfig::TConfig Config; + const TString ConveyorName = "common"; + std::shared_ptr<TTasksManager> Manager; + TCounters Counters; + TMonotonic LastAddProcessInstant = TMonotonic::Now(); + + void HandleMain(TEvExecution::TEvNewTask::TPtr& ev); + void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev); + void HandleMain(TEvExecution::TEvRegisterProcess::TPtr& ev); + void HandleMain(TEvExecution::TEvUnregisterProcess::TPtr& ev); + void HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev); + + void AddProcess(const ui64 processId, const TCPULimitsConfig& cpuLimits); + + void AddCPUTime(const ui64 processId, const TDuration d); + + TWorkerTask PopTask(); + + void PushTask(const TWorkerTask& task); + + void ChangeAmountCPULimit(const double delta); + +public: + STATEFN(StateMain) { + // NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("name", ConveyorName) + // ("workers", Workers.size())("waiting", Waiting.size())("actor_id", SelfId()); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExecution::TEvNewTask, HandleMain); + hFunc(NActors::TEvents::TEvWakeup, HandleMain); + hFunc(TEvInternal::TEvTaskProcessedResult, HandleMain); + hFunc(TEvExecution::TEvRegisterProcess, HandleMain); + hFunc(TEvExecution::TEvUnregisterProcess, HandleMain); + default: + AFL_ERROR(NKikimrServices::TX_CONVEYOR)("problem", "unexpected event for task executor")("ev_type", ev->GetTypeName()); + break; + } + } + + TDistributor(const NConfig::TConfig& config, const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals); + + ~TDistributor(); + + void Bootstrap(); +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/worker.cpp b/ydb/core/tx/conveyor_composite/service/worker.cpp new file mode 100644 index 00000000000..3ab805fe3ee --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/worker.cpp @@ -0,0 +1,64 @@ +#include "worker.h" + +namespace NKikimr::NConveyorComposite { + +TDuration TWorker::GetWakeupDuration() const { + AFL_VERIFY(ExecutionDuration); + return (*ExecutionDuration) * (1 - CPUSoftLimit) / CPUSoftLimit; +} + +void TWorker::ExecuteTask(std::vector<TWorkerTask>&& workerTasks) { + AFL_VERIFY(!ExecutionDuration && Results.empty()); + std::vector<TWorkerTaskResult> results; + results.reserve(workerTasks.size()); + const TMonotonic startGlobal = TMonotonic::Now(); + for (auto&& t : workerTasks) { + const TMonotonic start = TMonotonic::Now(); + t.GetTask()->Execute(t.GetTaskSignals(), t.GetTask()); + results.emplace_back(t.GetResult(start, TMonotonic::Now())); + } + if (CPUSoftLimit < 1) { + AFL_DEBUG(NKikimrServices::TX_CONVEYOR)("action", "to_wait_result")("id", SelfId())("count", workerTasks.size()); + ExecutionDuration = TMonotonic::Now() - startGlobal; + Results = std::move(results); + Schedule(GetWakeupDuration(), new NActors::TEvents::TEvWakeup(CPULimitGeneration)); + WaitWakeUp = true; + } else { + AFL_VERIFY(!!ForwardDuration); + AFL_DEBUG(NKikimrServices::TX_CONVEYOR)("action", "to_result")("id", SelfId())("count", Results.size())("d", TMonotonic::Now() - startGlobal); + TBase::Sender<TEvInternal::TEvTaskProcessedResult>(std::move(results), *ForwardDuration, WorkerIdx, WorkersPoolId).SendTo(DistributorId); + ForwardDuration.reset(); + } +} + +void TWorker::HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev) { + const auto evGeneration = ev->Get()->Tag; + AFL_VERIFY(evGeneration <= CPULimitGeneration); + if (evGeneration == CPULimitGeneration) { + OnWakeup(); + } +} + +void TWorker::OnWakeup() { + AFL_VERIFY(ExecutionDuration); + AFL_VERIFY(Results.size()); + AFL_VERIFY(!!ForwardDuration); + AFL_DEBUG(NKikimrServices::TX_CONVEYOR)("action", "wake_up")("id", SelfId())("count", Results.size()); + TBase::Sender<TEvInternal::TEvTaskProcessedResult>(std::move(Results), *ForwardDuration, WorkerIdx, WorkersPoolId).SendTo(DistributorId); + ForwardDuration.reset(); + Results.clear(); + ExecutionDuration.reset(); + + WaitWakeUp = false; +} + +void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) { + AFL_VERIFY(!WaitWakeUp); + const TMonotonic now = TMonotonic::Now(); + ForwardDuration = now - ev->Get()->GetConstructInstant(); + SendFwdHistogram->Collect(ForwardDuration->MicroSeconds()); + SendFwdDuration->Add(ForwardDuration->MicroSeconds()); + ExecuteTask(ev->Get()->ExtractTasks()); +} + +} diff --git a/ydb/core/tx/conveyor_composite/service/worker.h b/ydb/core/tx/conveyor_composite/service/worker.h new file mode 100644 index 00000000000..c8209c1e797 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/worker.h @@ -0,0 +1,69 @@ +#pragma once + +#include "counters.h" +#include "events.h" + +#include <ydb/library/accessor/accessor.h> +#include <ydb/library/actors/core/actor_bootstrapped.h> +#include <ydb/library/actors/core/event_local.h> +#include <ydb/library/actors/core/events.h> +#include <ydb/library/actors/core/hfunc.h> +#include <ydb/library/actors/core/log.h> +#include <ydb/library/conclusion/result.h> +#include <ydb/library/services/services.pb.h> + +namespace NKikimr::NConveyorComposite { + +class TWorker: public NActors::TActorBootstrapped<TWorker> { +private: + using TBase = NActors::TActorBootstrapped<TWorker>; + const double CPUHardLimit = 1; + YDB_READONLY(double, CPUSoftLimit, 1); + ui64 CPULimitGeneration = 0; + bool WaitWakeUp = false; + std::optional<TDuration> ForwardDuration; + const NActors::TActorId DistributorId; + const ui64 WorkerIdx; + const ui64 WorkersPoolId; + std::optional<TDuration> ExecutionDuration; + std::vector<TWorkerTaskResult> Results; + const ::NMonitoring::THistogramPtr SendFwdHistogram; + const ::NMonitoring::TDynamicCounters::TCounterPtr SendFwdDuration; + TDuration GetWakeupDuration() const; + void ExecuteTask(std::vector<TWorkerTask>&& workerTasks); + void HandleMain(TEvInternal::TEvNewTask::TPtr& ev); + void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev); + void OnWakeup(); + +public: + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvInternal::TEvNewTask, HandleMain); + hFunc(NActors::TEvents::TEvWakeup, HandleMain); + default: + ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "unexpected event for task executor: " << ev->GetTypeRewrite(); + break; + } + } + + void Bootstrap() { + Become(&TWorker::StateMain); + } + + TWorker(const TString& conveyorName, const double cpuHardLimit, const NActors::TActorId& distributorId, const ui64 workerIdx, + const ui64 workersPoolId, const ::NMonitoring::THistogramPtr sendFwdHistogram, + const ::NMonitoring::TDynamicCounters::TCounterPtr sendFwdDuration) + : TBase("CONVEYOR::" + conveyorName + "::WORKER") + , CPUHardLimit(cpuHardLimit) + , CPUSoftLimit(cpuHardLimit) + , DistributorId(distributorId) + , WorkerIdx(workerIdx) + , WorkersPoolId(workersPoolId) + , SendFwdHistogram(sendFwdHistogram) + , SendFwdDuration(sendFwdDuration) { + AFL_VERIFY(0 < CPUHardLimit); + AFL_VERIFY(CPUHardLimit <= 1); + } +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/workers_pool.cpp b/ydb/core/tx/conveyor_composite/service/workers_pool.cpp new file mode 100644 index 00000000000..8a722186953 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/workers_pool.cpp @@ -0,0 +1,85 @@ +#include "workers_pool.h" + +namespace NKikimr::NConveyorComposite { +TWorkersPool::TWorkersPool(const TString& conveyorName, const NActors::TActorId& distributorId, const NConfig::TWorkersPool& config, + const TCounters& counters, const std::vector<std::shared_ptr<TProcessCategory>>& categories) + : WorkersCount(config.GetWorkersCount()) + , Counters(counters) { + Workers.reserve(WorkersCount); + for (auto&& i : config.GetLinks()) { + AFL_VERIFY((ui64)i.GetCategory() < categories.size()); + Processes.emplace_back(TWeightedCategory(i.GetWeight(), categories[(ui64)i.GetCategory()])); + } + AFL_VERIFY(Processes.size()); + for (ui32 i = 0; i < WorkersCount; ++i) { + Workers.emplace_back(std::make_unique<TWorker>(conveyorName, config.GetWorkerCPUUsage(i), distributorId, i, + config.GetWorkersPoolId(), Counters.SendFwdHistogram, Counters.SendFwdDuration)); + ActiveWorkersIdx.emplace_back(i); + } + AFL_VERIFY(WorkersCount)("name", conveyorName)("action", "conveyor_registered")("config", config.DebugString())("actor_id", distributorId)( + "count", WorkersCount); + Counters.WaitingQueueSizeLimit->Set(config.GetWorkersCountDouble()); + Counters.AmountCPULimit->Set(0); + Counters.AvailableWorkersCount->Set(0); + Counters.WorkersCountLimit->Set(WorkersCount); +} + +bool TWorkersPool::HasFreeWorker() const { + return !ActiveWorkersIdx.empty(); +} + +void TWorkersPool::RunTask(std::vector<TWorkerTask>&& tasksBatch) { + AFL_VERIFY(HasFreeWorker()); + const auto workerIdx = ActiveWorkersIdx.back(); + ActiveWorkersIdx.pop_back(); + Counters.AvailableWorkersCount->Set(ActiveWorkersIdx.size()); + + auto& worker = Workers[workerIdx]; + worker.OnStartTask(); + TActivationContext::Send(worker.GetWorkerId(), std::make_unique<TEvInternal::TEvNewTask>(std::move(tasksBatch))); +} + +void TWorkersPool::ReleaseWorker(const ui32 workerIdx) { + AFL_VERIFY(workerIdx < Workers.size()); + Workers[workerIdx].OnStopTask(); + ActiveWorkersIdx.emplace_back(workerIdx); + Counters.AvailableWorkersCount->Set(ActiveWorkersIdx.size()); +} + +bool TWorkersPool::DrainTasks() { + if (ActiveWorkersIdx.empty()) { + return false; + } + const auto predHeap = [](const TWeightedCategory& l, const TWeightedCategory& r) { + const bool hasL = l.GetCategory()->HasTasks(); + const bool hasR = r.GetCategory()->HasTasks(); + if (!hasL && !hasR) { + return false; + } else if (!hasL && hasR) { + return true; + } else if (hasL && !hasR) { + return false; + } + return r.GetCPUUsage()->CalcWeight(r.GetWeight()) < l.GetCPUUsage()->CalcWeight(l.GetWeight()); + }; + std::make_heap(Processes.begin(), Processes.end(), predHeap); + AFL_VERIFY(Processes.size()); + bool newTask = false; + while (ActiveWorkersIdx.size() && Processes.front().GetCategory()->HasTasks()) { + TDuration predicted = TDuration::Zero(); + std::vector<TWorkerTask> tasks; + while ((tasks.empty() || predicted < DeliveringDuration.GetValue() * 10) && Processes.front().GetCategory()->HasTasks()) { + std::pop_heap(Processes.begin(), Processes.end(), predHeap); + tasks.emplace_back(Processes.back().GetCategory()->ExtractTaskWithPrediction()); + Processes.back().GetCPUUsage()->AddPredicted(tasks.back().GetPredictedDuration()); + predicted += tasks.back().GetPredictedDuration(); + std::push_heap(Processes.begin(), Processes.end(), predHeap); + } + newTask = true; + AFL_VERIFY(tasks.size()); + RunTask(std::move(tasks)); + } + return newTask; +} + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/workers_pool.h b/ydb/core/tx/conveyor_composite/service/workers_pool.h new file mode 100644 index 00000000000..6fa9def6c9b --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/workers_pool.h @@ -0,0 +1,100 @@ +#pragma once +#include "category.h" +#include "common.h" +#include "worker.h" + +#include <ydb/library/actors/core/actorid.h> + +namespace NKikimr::NConveyorComposite { + +class TWeightedCategory { +private: + YDB_READONLY(std::shared_ptr<TCPUUsage>, CPUUsage, std::make_shared<TCPUUsage>(nullptr)); + YDB_READONLY_DEF(std::shared_ptr<TProcessCategory>, Category); + YDB_READONLY(double, Weight, 1); + +public: + TWeightedCategory(const double weight, const std::shared_ptr<TProcessCategory>& cat) + : Category(cat) + , Weight(weight) + { + AFL_VERIFY(cat); + AFL_VERIFY(Weight); + } +}; + +class TWorkersPool { +private: + class TWorkerInfo { + YDB_READONLY(bool, RunningTask, false); + YDB_READONLY(TWorker*, Worker, nullptr); + YDB_READONLY_DEF(NActors::TActorId, WorkerId); + + public: + explicit TWorkerInfo(std::unique_ptr<TWorker>&& worker) + : Worker(worker.get()) + , WorkerId(TActivationContext::Register(worker.release())) { + } + + void OnStartTask() { + AFL_VERIFY(!RunningTask); + RunningTask = true; + } + + void OnStopTask() { + AFL_VERIFY(RunningTask); + RunningTask = false; + } + }; + + YDB_READONLY(ui32, WorkersCount, 0); + YDB_READONLY(double, MaxWorkerThreads, 0); + YDB_READONLY(double, AmountCPULimit, 0); + std::vector<TWeightedCategory> Processes; + std::vector<TWorkerInfo> Workers; + std::vector<ui32> ActiveWorkersIdx; + TCounters Counters; + TAverageCalcer<TDuration> DeliveringDuration; + std::deque<TDuration> DeliveryDurations; + +public: + static constexpr double Eps = 1e-6; + using TPtr = std::shared_ptr<TWorkersPool>; + + TWorkersPool(const TString& conveyorName, const NActors::TActorId& distributorId, const NConfig::TWorkersPool& config, + const TCounters& counters, const std::vector<std::shared_ptr<TProcessCategory>>& categories); + + bool HasTasks() const { + for (auto&& i : Processes) { + if (i.GetCategory()->HasTasks()) { + return true; + } + } + return false; + } + + [[nodiscard]] bool DrainTasks(); + + void AddDeliveryDuration(const TDuration d) { + DeliveringDuration.Add(d); + } + + void PutTaskResult(TWorkerTaskResult&& result) { +// const ui32 catIdx = (ui32)result.GetCategory(); + for (auto&& i : Processes) { + if (i.GetCategory()->GetCategory() == result.GetCategory()) { +// AFL_VERIFY(catIdx < Processes.size()); +// AFL_VERIFY(Processes[catIdx]); + i.GetCPUUsage()->Exchange(result.GetPredictedDuration(), result.GetStart(), result.GetFinish()); + i.GetCategory()->PutTaskResult(std::move(result)); + return; + } + } + AFL_VERIFY(false); + } + bool HasFreeWorker() const; + void RunTask(std::vector<TWorkerTask>&& tasksBatch); + void ReleaseWorker(const ui32 workerIdx); +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/service/ya.make b/ydb/core/tx/conveyor_composite/service/ya.make new file mode 100644 index 00000000000..5ca71830673 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/service/ya.make @@ -0,0 +1,21 @@ +LIBRARY() + +SRCS( + worker.cpp + service.cpp + process.cpp + common.cpp + manager.cpp + workers_pool.cpp + category.cpp + scope.cpp + counters.cpp + events.cpp +) + +PEERDIR( + ydb/core/tx/conveyor_composite/usage + ydb/core/protos +) + +END() diff --git a/ydb/core/tx/conveyor_composite/usage/common.cpp b/ydb/core/tx/conveyor_composite/usage/common.cpp new file mode 100644 index 00000000000..aeaaaa4af88 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/usage/common.cpp @@ -0,0 +1,18 @@ +#include "common.h" +#include "events.h" + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/log.h> + +namespace NKikimr::NConveyorComposite { + +void TProcessGuard::Finish() { + AFL_VERIFY(!Finished); + Finished = true; + if (ServiceActorId && NActors::TlsActivationContext) { + auto& context = NActors::TActorContext::AsActorContext(); + context.Send(*ServiceActorId, new TEvExecution::TEvUnregisterProcess(Category, ScopeId, ProcessId)); + } +} + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/usage/common.h b/ydb/core/tx/conveyor_composite/usage/common.h new file mode 100644 index 00000000000..612b3aa511e --- /dev/null +++ b/ydb/core/tx/conveyor_composite/usage/common.h @@ -0,0 +1,44 @@ +#pragma once +#include <ydb/core/tx/conveyor/usage/abstract.h> + +namespace NKikimr::NConveyorComposite { +using ITask = NConveyor::ITask; + +enum class ESpecialTaskCategory { + Insert = 0 /* "insert" */, + Compaction = 1 /* "compaction" */, + Normalizer = 2 /* "normalizer" */, + Scan = 3 /* "scan" */ +}; + +class TProcessGuard: TNonCopyable { +private: + const ESpecialTaskCategory Category; + const TString ScopeId; + const ui64 ProcessId; + bool Finished = false; + const std::optional<NActors::TActorId> ServiceActorId; + +public: + ui64 GetProcessId() const { + return ProcessId; + } + + explicit TProcessGuard( + const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId, const std::optional<NActors::TActorId>& actorId) + : Category(category) + , ScopeId(scopeId) + , ProcessId(processId) + , ServiceActorId(actorId) { + } + + void Finish(); + + ~TProcessGuard() { + if (!Finished) { + Finish(); + } + } +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/usage/config.cpp b/ydb/core/tx/conveyor_composite/usage/config.cpp new file mode 100644 index 00000000000..e0fbda75b34 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/usage/config.cpp @@ -0,0 +1,206 @@ +#include "config.h" + +#include <ydb/library/actors/core/log.h> + +#include <util/string/builder.h> +#include <util/string/join.h> + +namespace NKikimr::NConveyorComposite::NConfig { + +TConclusionStatus TConfig::DeserializeFromProto(const NKikimrConfig::TCompositeConveyorConfig& config, const ui64 usableThreadsCount) { + if (!config.HasEnabled()) { + EnabledFlag = true; + } else { + EnabledFlag = config.GetEnabled(); + } + for (auto&& i : GetEnumAllValues<ESpecialTaskCategory>()) { + Categories.emplace_back(TCategory(i)); + } + TWorkersPool* defWorkersPool = nullptr; + WorkerPools.reserve(1 + config.GetWorkerPools().size()); + if ((ui32)config.GetCategories().size() != GetEnumAllValues<ESpecialTaskCategory>().size()) { + TWorkersPool wp(WorkerPools.size(), usableThreadsCount); + WorkerPools.emplace_back(std::move(wp)); + defWorkersPool = &WorkerPools.front(); + } + std::set<ESpecialTaskCategory> usedCategories; + for (auto&& i : config.GetCategories()) { + TCategory cat(ESpecialTaskCategory::Insert); + auto conclusion = cat.DeserializeFromProto(i); + if (conclusion.IsFail()) { + return conclusion; + } + if (!usedCategories.emplace(cat.GetCategory()).second) { + return TConclusionStatus::Fail("category " + ::ToString(cat.GetCategory()) + " duplication"); + } + Categories[(ui64)cat.GetCategory()] = std::move(cat); + } + for (auto&& i : Categories) { + if (i.IsDefault()) { + AFL_VERIFY(defWorkersPool); + AFL_VERIFY(defWorkersPool->AddLink(i.GetCategory())); + AFL_VERIFY(i.AddWorkerPool(defWorkersPool->GetWorkersPoolId())); + } + } + for (auto&& i : config.GetWorkerPools()) { + TWorkersPool wp(WorkerPools.size()); + auto conclusion = wp.DeserializeFromProto(i, usableThreadsCount); + if (conclusion.IsFail()) { + return conclusion; + } + WorkerPools.emplace_back(std::move(wp)); + for (auto&& link : WorkerPools.back().GetLinks()) { + AFL_VERIFY((ui64)link.GetCategory() < Categories.size()); + auto& cat = Categories[(ui64)link.GetCategory()]; + if (!cat.AddWorkerPool(WorkerPools.back().GetWorkersPoolId())) { + return TConclusionStatus::Fail("double link for category: " + ::ToString(link.GetCategory())); + } + } + } + for (auto&& c : Categories) { + if (c.GetWorkerPools().empty()) { + return TConclusionStatus::Fail("no worker pools for category: " + ::ToString(c.GetCategory())); + } + } + return TConclusionStatus::Success(); +} + +double TWorkersPool::GetWorkerCPUUsage(const ui32 workerIdx) const { + AFL_VERIFY(WorkersCountDouble); + double wholePart; + const double fractionalPart = std::modf(WorkersCountDouble, &wholePart); + if (workerIdx + 1 <= wholePart) { + return 1; + } else { + AFL_VERIFY(workerIdx == wholePart); + AFL_VERIFY(fractionalPart)("count", WorkersCountDouble); + return fractionalPart; + } +} + +const TCategory& TConfig::GetCategoryConfig(const ESpecialTaskCategory cat) const { + AFL_VERIFY((ui64)cat < Categories.size()); + return Categories[(ui64)cat]; +} + +TString TConfig::DebugString() const { + TStringBuilder sb; + sb << "{"; + sb << "{Categories:["; + for (auto&& c : Categories) { + sb << c.DebugString() << ";"; + } + sb << "]};"; + sb << "{WorkerPools:["; + for (auto&& wp : WorkerPools) { + sb << wp.DebugString() << ";"; + } + sb << "]};"; + sb << "Enabled=" << EnabledFlag << ";"; + sb << "}"; + return sb; +} + +TWorkersPool::TWorkersPool(const ui32 wpId, const double workersCountDouble) + : WorkersPoolId(wpId) + , WorkersCountDouble(workersCountDouble) { + AFL_VERIFY(WorkersCountDouble); +} + +TConclusionStatus TWorkersPool::DeserializeFromProto( + const NKikimrConfig::TCompositeConveyorConfig::TWorkersPool& proto, const ui64 usableThreadsCount) { + if (!proto.GetLinks().size()) { + return TConclusionStatus::Fail("no categories for workers pool"); + } + for (auto&& c : proto.GetLinks()) { + TWorkerPoolCategoryUsage link; + auto conclusion = link.DeserializeFromProto(c); + if (conclusion.IsFail()) { + return conclusion; + } + Links.emplace_back(std::move(link)); + } + if (Links.empty()) { + return TConclusionStatus::Fail("no links for workers pool"); + } + if (proto.HasWorkersCount()) { + WorkersCountDouble = proto.GetWorkersCount(); + } else if (proto.HasDefaultFractionOfThreadsCount()) { + WorkersCountDouble = usableThreadsCount * proto.GetDefaultFractionOfThreadsCount(); + } else { + WorkersCountDouble = usableThreadsCount; + } + if (WorkersCountDouble <= 0) { + return TConclusionStatus::Fail( + "incorrect WorkersCount calculated: " + proto.DebugString() + " for " + ::ToString(usableThreadsCount) + " threads"); + } + + return TConclusionStatus::Success(); +} + +TString TWorkersPool::DebugString() const { + TStringBuilder sb; + sb << "{"; + sb << "id=" << WorkersPoolId << ";"; + sb << "count=" << WorkersCountDouble << ";"; + TStringBuilder sbLinks; + sbLinks << "["; + for (auto&& l : Links) { + sbLinks << l.DebugString() << ";"; + } + sbLinks << "]"; + sb << "links=" << sbLinks << ";"; + sb << "}"; + return sb; +} + +ui32 TWorkersPool::GetWorkersCount() const { + AFL_VERIFY(WorkersCountDouble); + return ceil(WorkersCountDouble); +} + +TString TCategory::DebugString() const { + TStringBuilder sb; + sb << "{"; + sb << "category=" << Category << ";"; + sb << "queue_limit=" << QueueSizeLimit << ";"; + sb << "pools=" << JoinSeq(",", WorkerPools) << ";"; + sb << "}"; + return sb; +} + +TString TWorkerPoolCategoryUsage::DebugString() const { + TStringBuilder sb; + sb << "{"; + sb << "c=" << Category << ";"; + sb << "w=" << Weight << ";"; + sb << "}"; + return sb; +} + +} // namespace NKikimr::NConveyorComposite::NConfig + +namespace NKikimr::NConveyorComposite { +TCPULimitsConfig::TCPULimitsConfig(const double cpuGroupThreadsLimit, const double weight) + : CPUGroupThreadsLimit(cpuGroupThreadsLimit) + , Weight(weight) { +} + +TConclusionStatus TCPULimitsConfig::DeserializeFromProto(const NKikimrTxDataShard::TEvKqpScan& config) { + if (config.HasCpuGroupThreadsLimit()) { + CPUGroupThreadsLimit = config.GetCpuGroupThreadsLimit(); + } + return TConclusionStatus::Success(); +} + +TString TCPULimitsConfig::DebugString() const { + TStringBuilder sb; + if (CPUGroupThreadsLimit) { + sb << "CPUGroupThreadsLimit=" << *CPUGroupThreadsLimit << ";"; + } else { + sb << "Disabled;"; + } + return sb; +} + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/usage/config.h b/ydb/core/tx/conveyor_composite/usage/config.h new file mode 100644 index 00000000000..2f074b24bb2 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/usage/config.h @@ -0,0 +1,138 @@ +#pragma once +#include "common.h" + +#include <ydb/core/protos/config.pb.h> +#include <ydb/core/protos/tx_datashard.pb.h> +#include <ydb/core/tx/conveyor/usage/config.h> + +#include <ydb/library/accessor/accessor.h> +#include <ydb/library/conclusion/status.h> + +namespace NKikimr::NConveyorComposite::NConfig { + +class TWorkerPoolCategoryUsage { +private: + YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert); + YDB_READONLY(double, Weight, 1); + +public: + TWorkerPoolCategoryUsage() = default; + + TWorkerPoolCategoryUsage(const ESpecialTaskCategory cat) + : Category(cat) { + } + + TString DebugString() const; + + [[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrConfig::TCompositeConveyorConfig::TWorkerPoolCategoryLink& proto) { + if (!TryFromString<ESpecialTaskCategory>(proto.GetCategory(), Category)) { + return TConclusionStatus::Fail("cannot parse category link: " + proto.GetCategory()); + } + if (proto.HasWeight()) { + if (proto.GetWeight() <= 0) { + return TConclusionStatus::Fail("incorrect category link weight: " + ::ToString(proto.GetWeight())); + } + Weight = proto.GetWeight(); + } + return TConclusionStatus::Success(); + } +}; + +class TWorkersPool { +private: + YDB_READONLY(ui32, WorkersPoolId, 0); + YDB_READONLY(double, WorkersCountDouble, 0); + YDB_READONLY_DEF(std::vector<TWorkerPoolCategoryUsage>, Links); + +public: + double GetWorkerCPUUsage(const ui32 workerIdx) const; + ui32 GetWorkersCount() const; + + bool AddLink(const ESpecialTaskCategory cat) { + for (auto&& i : Links) { + if (i.GetCategory() == cat) { + return false; + } + } + Links.emplace_back(TWorkerPoolCategoryUsage(cat)); + return true; + } + + TString DebugString() const; + + TWorkersPool(const ui32 wpId) + : WorkersPoolId(wpId) { + } + + TWorkersPool(const ui32 wpId, const double workersCountDouble); + + [[nodiscard]] TConclusionStatus DeserializeFromProto( + const NKikimrConfig::TCompositeConveyorConfig::TWorkersPool& proto, const ui64 usableThreadsCount); +}; + +class TCategory { +private: + YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert); + YDB_READONLY(ui32, QueueSizeLimit, 256 * 1024); + YDB_READONLY_DEF(std::vector<ui32>, WorkerPools); + YDB_READONLY_FLAG(Default, true); + +public: + TString DebugString() const; + + [[nodiscard]] bool AddWorkerPool(const ui32 id) { + for (auto&& i : WorkerPools) { + if (i == id) { + return false; + } + } + WorkerPools.emplace_back(id); + return true; + } + + [[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrConfig::TCompositeConveyorConfig::TCategory& proto) { + if (!TryFromString<ESpecialTaskCategory>(proto.GetName(), Category)) { + return TConclusionStatus::Fail("cannot parse category: " + proto.GetName()); + } + if (proto.HasQueueSizeLimit()) { + QueueSizeLimit = proto.GetQueueSizeLimit(); + } + DefaultFlag = false; + return TConclusionStatus::Success(); + } + + TCategory(const ESpecialTaskCategory cat) + : Category(cat) { + } +}; + +class TConfig { +private: + YDB_READONLY_DEF(std::vector<TCategory>, Categories); + YDB_READONLY_DEF(std::vector<TWorkersPool>, WorkerPools); + YDB_READONLY_FLAG(Enabled, true); + +public: + const TCategory& GetCategoryConfig(const ESpecialTaskCategory cat) const; + + [[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrConfig::TCompositeConveyorConfig& config, const ui64 usableThreadsCount); + + TString DebugString() const; +}; + +} // namespace NKikimr::NConveyorComposite::NConfig + +namespace NKikimr::NConveyorComposite { +class TCPULimitsConfig { + YDB_OPT(double, CPUGroupThreadsLimit); + YDB_READONLY(double, Weight, 1); + +public: + TCPULimitsConfig() = default; + TCPULimitsConfig(const double cpuGroupThreadsLimit, const double weight = 1); + + TConclusionStatus DeserializeFromProto(const NKikimrTxDataShard::TEvKqpScan& config); + TString DebugString() const; +}; + +} diff --git a/ydb/core/tx/conveyor_composite/usage/events.cpp b/ydb/core/tx/conveyor_composite/usage/events.cpp new file mode 100644 index 00000000000..5c94bd68be2 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/usage/events.cpp @@ -0,0 +1,15 @@ +#include "events.h" + +#include <ydb/library/actors/core/log.h> + +namespace NKikimr::NConveyorComposite { + +TEvExecution::TEvNewTask::TEvNewTask(ITask::TPtr task, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId) + : Task(task) + , Category(category) + , ScopeId(scopeId) + , ProcessId(processId) { + AFL_VERIFY(Task); +} + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/usage/events.h b/ydb/core/tx/conveyor_composite/usage/events.h new file mode 100644 index 00000000000..a3169dad9bd --- /dev/null +++ b/ydb/core/tx/conveyor_composite/usage/events.h @@ -0,0 +1,69 @@ +#pragma once +#include "common.h" +#include "config.h" + +#include <ydb/core/base/events.h> + +#include <ydb/library/actors/core/event_local.h> +#include <ydb/library/actors/core/events.h> +#include <ydb/library/conclusion/result.h> + +namespace NKikimr::NConveyorComposite { + +struct TEvExecution { + enum EEv { + EvNewTask = EventSpaceBegin(TKikimrEvents::ES_CONVEYOR_COMPOSITE), + EvRegisterProcess, + EvUnregisterProcess, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CONVEYOR_COMPOSITE), "expected EvEnd < EventSpaceEnd"); + + class TEvNewTask: public NActors::TEventLocal<TEvNewTask, EvNewTask> { + private: + YDB_READONLY_DEF(ITask::TPtr, Task); + YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert); + YDB_READONLY_DEF(TString, ScopeId); + YDB_READONLY(ui64, ProcessId, 0); + YDB_READONLY(TMonotonic, ConstructInstant, TMonotonic::Now()); + + public: + TEvNewTask() = default; + + explicit TEvNewTask(ITask::TPtr task, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId); + }; + + class TEvRegisterProcess: public NActors::TEventLocal<TEvRegisterProcess, EvRegisterProcess> { + private: + YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert); + YDB_READONLY_DEF(TString, ScopeId); + YDB_READONLY(ui64, ProcessId, 0); + YDB_READONLY_DEF(TCPULimitsConfig, CPULimits); + + public: + explicit TEvRegisterProcess( + const TCPULimitsConfig& cpuLimits, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId) + : Category(category) + , ScopeId(scopeId) + , ProcessId(processId) + , CPULimits(cpuLimits) { + } + }; + + class TEvUnregisterProcess: public NActors::TEventLocal<TEvUnregisterProcess, EvUnregisterProcess> { + private: + YDB_READONLY(ESpecialTaskCategory, Category, ESpecialTaskCategory::Insert); + YDB_READONLY_DEF(TString, ScopeId); + YDB_READONLY(ui64, ProcessId, 0); + + public: + explicit TEvUnregisterProcess(const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId) + : Category(category) + , ScopeId(scopeId) + , ProcessId(processId) { + } + }; +}; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/usage/service.cpp b/ydb/core/tx/conveyor_composite/usage/service.cpp new file mode 100644 index 00000000000..eb3953901e4 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/usage/service.cpp @@ -0,0 +1,5 @@ +#include "service.h" + +namespace NKikimr::NConveyorComposite { + +} diff --git a/ydb/core/tx/conveyor_composite/usage/service.h b/ydb/core/tx/conveyor_composite/usage/service.h new file mode 100644 index 00000000000..336b5b5b9da --- /dev/null +++ b/ydb/core/tx/conveyor_composite/usage/service.h @@ -0,0 +1,83 @@ +#pragma once +#include "common.h" +#include "config.h" + +#include <ydb/core/tx/conveyor_composite/service/service.h> +#include <ydb/core/tx/conveyor_composite/usage/events.h> + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/actorid.h> + +namespace NKikimr::NConveyorComposite { +template <class TConveyorPolicy> +class TServiceOperatorImpl { +private: + using TSelf = TServiceOperatorImpl<TConveyorPolicy>; + std::atomic<bool> IsEnabledFlag = false; + static void Register(const NConfig::TConfig& serviceConfig) { + Singleton<TSelf>()->IsEnabledFlag = serviceConfig.IsEnabled(); + } + static const TString& GetConveyorName() { + Y_ABORT_UNLESS(TConveyorPolicy::Name.size() == 4); + return TConveyorPolicy::Name; + } + +public: + static bool SendTaskToExecute(const std::shared_ptr<ITask>& task, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId) { + if (TSelf::IsEnabled() && NActors::TlsActivationContext) { + auto& context = NActors::TActorContext::AsActorContext(); + const NActors::TActorId& selfId = context.SelfID; + context.Send(MakeServiceId(selfId.NodeId()), new NConveyorComposite::TEvExecution::TEvNewTask(task, category, scopeId, processId)); + return true; + } else { + task->Execute(nullptr, task); + return false; + } + } + static bool IsEnabled() { + return Singleton<TSelf>()->IsEnabledFlag; + } + static NActors::TActorId MakeServiceId(const ui32 nodeId) { + return NActors::TActorId(nodeId, "SrvcConv" + GetConveyorName()); + } + static NActors::IActor* CreateService(const NConfig::TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) { + Register(config); + return new TDistributor(config, GetConveyorName(), conveyorSignals); + } + static TProcessGuard StartProcess( + const ESpecialTaskCategory category, const TString& scopeId, const ui64 externalProcessId, const TCPULimitsConfig& cpuLimits) { + if (TSelf::IsEnabled() && NActors::TlsActivationContext) { + auto& context = NActors::TActorContext::AsActorContext(); + const NActors::TActorId& selfId = context.SelfID; + context.Send(MakeServiceId(selfId.NodeId()), + new NConveyorComposite::TEvExecution::TEvRegisterProcess(cpuLimits, category, scopeId, externalProcessId)); + return TProcessGuard(category, scopeId, externalProcessId, MakeServiceId(selfId.NodeId())); + } else { + return TProcessGuard(category, scopeId, externalProcessId, {}); + } + } +}; + +class TScanConveyorPolicy { +public: + static const inline TString Name = "Scan"; + static constexpr bool EnableProcesses = true; +}; + +class TCompConveyorPolicy { +public: + static const inline TString Name = "Comp"; + static constexpr bool EnableProcesses = false; +}; + +class TInsertConveyorPolicy { +public: + static const inline TString Name = "Isrt"; + static constexpr bool EnableProcesses = false; +}; + +using TScanServiceOperator = TServiceOperatorImpl<TScanConveyorPolicy>; +using TCompServiceOperator = TServiceOperatorImpl<TCompConveyorPolicy>; +using TInsertServiceOperator = TServiceOperatorImpl<TInsertConveyorPolicy>; + +} // namespace NKikimr::NConveyorComposite diff --git a/ydb/core/tx/conveyor_composite/usage/ya.make b/ydb/core/tx/conveyor_composite/usage/ya.make new file mode 100644 index 00000000000..5e307111624 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/usage/ya.make @@ -0,0 +1,17 @@ +LIBRARY() + +SRCS( + events.cpp + config.cpp + service.cpp + common.cpp +) + +PEERDIR( + ydb/library/actors/core + ydb/services/metadata/request +) + +GENERATE_ENUM_SERIALIZATION(common.h) + +END() diff --git a/ydb/core/tx/conveyor_composite/ut/ut_simple.cpp b/ydb/core/tx/conveyor_composite/ut/ut_simple.cpp new file mode 100644 index 00000000000..046394f3c71 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/ut/ut_simple.cpp @@ -0,0 +1,425 @@ +#include <ydb/core/tx/conveyor/usage/abstract.h> +#include <ydb/core/tx/conveyor_composite/usage/config.h> +#include <ydb/core/tx/conveyor_composite/usage/events.h> +#include <ydb/core/tx/conveyor_composite/usage/service.h> + +#include <ydb/library/actors/core/executor_pool_basic.h> +#include <ydb/library/actors/core/scheduler_basic.h> + +#include <contrib/libs/protobuf/src/google/protobuf/text_format.h> +#include <library/cpp/testing/unittest/registar.h> +#include <util/generic/xrange.h> + +using namespace NKikimr::NConveyorComposite; + +namespace NKikimr { + +THolder<TActorSystemSetup> BuildActorSystemSetup(const ui32 threads, const ui32 pools) { + Y_ABORT_UNLESS(threads > 0 && threads < 100); + Y_ABORT_UNLESS(pools > 0 && pools < 10); + + auto setup = MakeHolder<NActors::TActorSystemSetup>(); + + setup->NodeId = 1; + + setup->ExecutorsCount = pools; + setup->Executors.Reset(new TAutoPtr<NActors::IExecutorPool>[pools]); + for (ui32 idx : xrange(pools)) { + setup->Executors[idx] = new NActors::TBasicExecutorPool(idx, threads, 50); + } + + setup->Scheduler = new NActors::TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0)); + + return setup; +} + +} // namespace NKikimr + +class TSleepTask: public NKikimr::NConveyor::ITask { +private: + const TDuration ExecutionTime; + TAtomicCounter* Counter; + virtual void DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) override { + const TMonotonic start = TMonotonic::Now(); + while (TMonotonic::Now() - start < ExecutionTime) { + } + Counter->Inc(); + } + +public: + virtual TString GetTaskClassIdentifier() const override { + return "SLEEP"; + } + + TSleepTask(const TDuration d, TAtomicCounter& c) + : ExecutionTime(d) + , Counter(&c) { + } +}; + +class IRequestProcessor { +private: + YDB_READONLY_DEF(TString, Id); + virtual void DoInitialize(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) = 0; + virtual void DoAddTask(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) = 0; + virtual bool DoCheckFinished() = 0; + virtual void DoFinish(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId, const TDuration d) = 0; + virtual TString DoDebugString() const = 0; + +public: + void Initialize(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) { + DoInitialize(actorSystem, distributorId); + } + void AddTask(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) { + DoAddTask(actorSystem, distributorId); + } + bool CheckFinished() { + return DoCheckFinished(); + } + void Finish(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId, const TDuration d) { + DoFinish(actorSystem, distributorId, d); + } + TString DebugString() const { + return TStringBuilder() << "{" << Id << ":" << DoDebugString() << "}"; + } + IRequestProcessor(const TString& id) + : Id(id) { + } + virtual ~IRequestProcessor() = default; +}; + +class TSimpleRequest: public IRequestProcessor { +private: + YDB_ACCESSOR(double, ScopeWeight, 1); + const ESpecialTaskCategory Category; + const TString ScopeId; + const ui64 ProcessId; + TAtomicCounter Counter; + TAtomicCounter CounterTasks; + virtual TString DoDebugString() const override { + return TStringBuilder() << Counter.Val(); + } + + virtual void DoInitialize(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) override { + actorSystem.Send(distributorId, new TEvExecution::TEvRegisterProcess(TCPULimitsConfig(1000, ScopeWeight), Category, ScopeId, ProcessId)); + } + virtual void DoAddTask(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId) override { + actorSystem.Send(distributorId, + new TEvExecution::TEvNewTask(std::make_shared<TSleepTask>(TDuration::MicroSeconds(40), Counter), Category, ScopeId, ProcessId)); + CounterTasks.Inc(); + } + virtual bool DoCheckFinished() override { + return CounterTasks.Val() == Counter.Val(); + } + virtual void DoFinish(NActors::TActorSystem& actorSystem, const NActors::TActorId distributorId, const TDuration /*d*/) override { + actorSystem.Send(distributorId, new TEvExecution::TEvUnregisterProcess(Category, ScopeId, ProcessId)); + } + +public: + TSimpleRequest(const TString& id, const ESpecialTaskCategory category, const TString& scopeId, const ui64 processId) + : IRequestProcessor(id) + , Category(category) + , ScopeId(scopeId) + , ProcessId(processId) { + } +}; + +class TTestingExecutor { +private: + virtual TString GetConveyorConfig() = 0; + virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() = 0; + virtual ui32 GetTasksCount() const { + return 1000000; + } +public: + virtual double GetThreadsCount() const { + return 9.5; + } + + void Execute() { + const ui64 threadsCount = 64; + THolder<NActors::TActorSystemSetup> actorSystemSetup = NKikimr::BuildActorSystemSetup(threadsCount, 1); + NActors::TActorSystem actorSystem(actorSystemSetup); + + actorSystem.Start(); + auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); + const std::string textProto = GetConveyorConfig(); + NKikimrConfig::TCompositeConveyorConfig protoConfig; + AFL_VERIFY(google::protobuf::TextFormat::ParseFromString(textProto, &protoConfig)); + + NConfig::TConfig config; + config.DeserializeFromProto(protoConfig, threadsCount).Validate(); + const auto actorId = actorSystem.Register(TCompServiceOperator::CreateService(config, counters)); + + std::vector<std::shared_ptr<IRequestProcessor>> requests = GetRequests(); + for (auto&& i : requests) { + i->Initialize(actorSystem, actorId); + } + for (ui32 i = 0; i < GetTasksCount(); ++i) { + for (auto&& i : requests) { + i->AddTask(actorSystem, actorId); + } + } + const TMonotonic globalStart = TMonotonic::Now(); + std::vector<TDuration> durations; + durations.resize(requests.size()); + { + bool isFinished = false; + while (!isFinished) { + isFinished = true; + ui32 idx = 0; + TStringBuilder sb; + for (auto&& i : requests) { + if (!i->CheckFinished()) { + isFinished = false; + } else if (!durations[idx]) { + durations[idx] = TMonotonic::Now() - globalStart; + } + sb << i->DebugString() << ";"; + ++idx; + } + Cerr << sb << Endl; + if (!isFinished) { + Sleep(TDuration::Seconds(1)); + } + } + } + { + ui32 idx = 0; + for (auto&& i : requests) { + i->Finish(actorSystem, actorId, durations[idx]); + ++idx; + } + } + Cerr << (GetThreadsCount() * (TMonotonic::Now() - globalStart) / (1.0 * requests.size() * GetTasksCount())).MicroSeconds() << "us per task" + << Endl; + TStringBuilder sb; + for (auto&& i : durations) { + sb << i << ";"; + } + Cerr << sb << Endl; + Sleep(TDuration::Seconds(5)); + + actorSystem.Stop(); + actorSystem.Cleanup(); + }; +}; + +Y_UNIT_TEST_SUITE(CompositeConveyorTests) { + class TTestingExecutor10xDistribution: public TTestingExecutor { + private: + virtual TString GetConveyorConfig() override { + return Sprintf(R"( + WorkerPools { + WorkersCount: %f + Links { + Category: "insert" + Weight: 0.1 + } + Links { + Category: "scan" + Weight: 0.01 + } + Links { + Category: "normalizer" + Weight: 0.001 + } + } + Categories { + Name: "insert" + } + Categories { + Name: "normalizer" + } + Categories { + Name: "scan" + } + )", + GetThreadsCount()); + } + virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() override { + return { std::make_shared<TSimpleRequest>("I", ESpecialTaskCategory::Insert, "1", 1), + std::make_shared<TSimpleRequest>("S", ESpecialTaskCategory::Scan, "1", 1), + std::make_shared<TSimpleRequest>("N", ESpecialTaskCategory::Normalizer, "1", 1) }; + } + + public: + }; + Y_UNIT_TEST(Test10xDistribution) { + TTestingExecutor10xDistribution().Execute(); + } + + class TTestingExecutor10xMultiDistribution: public TTestingExecutor { + private: + virtual TString GetConveyorConfig() override { + return Sprintf(R"( + WorkerPools { + WorkersCount: %f + Links { + Category: "scan" + Weight: 1 + } + } + WorkerPools { + WorkersCount: %f + Links { + Category: "insert" + Weight: 0.1 + } + Links { + Category: "scan" + Weight: 0.01 + } + Links { + Category: "normalizer" + Weight: 0.001 + } + } + Categories { + Name: "insert" + } + Categories { + Name: "normalizer" + } + Categories { + Name: "scan" + } + )", + GetThreadsCount(), GetThreadsCount()); + } + virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() override { + return { std::make_shared<TSimpleRequest>("I", ESpecialTaskCategory::Insert, "1", 1), + std::make_shared<TSimpleRequest>("S", ESpecialTaskCategory::Scan, "1", 1), + std::make_shared<TSimpleRequest>("N", ESpecialTaskCategory::Normalizer, "1", 1) }; + } + + public: + }; + Y_UNIT_TEST(Test10xMultiDistribution) { + TTestingExecutor10xMultiDistribution().Execute(); + } + + class TTestingExecutorUniformProcessDistribution: public TTestingExecutor { + private: + virtual TString GetConveyorConfig() override { + return Sprintf(R"( + WorkerPools { + WorkersCount: %f + Links { + Category: "insert" + Weight: 1 + } + } + Categories { + Name: "insert" + } + )", + GetThreadsCount()); + } + virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() override { + return { std::make_shared<TSimpleRequest>("1", ESpecialTaskCategory::Insert, "1", 1), + std::make_shared<TSimpleRequest>("2", ESpecialTaskCategory::Insert, "1", 2), + std::make_shared<TSimpleRequest>("3", ESpecialTaskCategory::Insert, "1", 3) }; + } + + public: + }; + Y_UNIT_TEST(TestUniformProcessDistribution) { + TTestingExecutorUniformProcessDistribution().Execute(); + } + + class TTestingExecutorUniformScopesDistribution: public TTestingExecutor { + private: + virtual TString GetConveyorConfig() override { + return Sprintf(R"( + WorkerPools { + WorkersCount: %f + Links { + Category: "insert" + Weight: 1 + } + } + Categories { + Name: "insert" + } + )", + GetThreadsCount()); + } + virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() override { + return { std::make_shared<TSimpleRequest>("1", ESpecialTaskCategory::Insert, "1", 1), + std::make_shared<TSimpleRequest>("2", ESpecialTaskCategory::Insert, "2", 1), + std::make_shared<TSimpleRequest>("3", ESpecialTaskCategory::Insert, "3", 1) }; + } + + public: + }; + Y_UNIT_TEST(TestUniformScopesDistribution) { + TTestingExecutorUniformScopesDistribution().Execute(); + } + + class TTestingExecutorUniformDistribution: public TTestingExecutor { + private: + virtual ui32 GetTasksCount() const override { + return 1000000; + } + virtual double GetThreadsCount() const override { + return 16.4; + } + virtual TString GetConveyorConfig() override { + return Sprintf(R"( + WorkerPools { + WorkersCount: %f + Links { + Category: "insert" + Weight: 0.1 + } + Links { + Category: "scan" + Weight: 0.1 + } + Links { + Category: "normalizer" + Weight: 0.1 + } + } + Categories { + Name: "insert" + } + Categories { + Name: "normalizer" + } + Categories { + Name: "scan" + } + )", + GetThreadsCount()); + } + virtual std::vector<std::shared_ptr<IRequestProcessor>> GetRequests() override { + return { + std::make_shared<TSimpleRequest>("I_1_1", ESpecialTaskCategory::Insert, "1", 1), + std::make_shared<TSimpleRequest>("I_2_1", ESpecialTaskCategory::Insert, "2", 1), + std::make_shared<TSimpleRequest>("I_3_1", ESpecialTaskCategory::Insert, "3", 1), + std::make_shared<TSimpleRequest>("S_1_1", ESpecialTaskCategory::Scan, "1", 1), + std::make_shared<TSimpleRequest>("S_2_1", ESpecialTaskCategory::Scan, "2", 1), + std::make_shared<TSimpleRequest>("S_3_1", ESpecialTaskCategory::Scan, "3", 1), + std::make_shared<TSimpleRequest>("N_1_1", ESpecialTaskCategory::Normalizer, "1", 1), + std::make_shared<TSimpleRequest>("N_2_1", ESpecialTaskCategory::Normalizer, "2", 1), + std::make_shared<TSimpleRequest>("N_3_1", ESpecialTaskCategory::Normalizer, "3", 1), + std::make_shared<TSimpleRequest>("I_1_2", ESpecialTaskCategory::Insert, "1", 2), + std::make_shared<TSimpleRequest>("I_2_2", ESpecialTaskCategory::Insert, "2", 2), + std::make_shared<TSimpleRequest>("I_3_2", ESpecialTaskCategory::Insert, "3", 2), + std::make_shared<TSimpleRequest>("S_1_2", ESpecialTaskCategory::Scan, "1", 2), + std::make_shared<TSimpleRequest>("S_2_2", ESpecialTaskCategory::Scan, "2", 2), + std::make_shared<TSimpleRequest>("S_3_2", ESpecialTaskCategory::Scan, "3", 2), + std::make_shared<TSimpleRequest>("N_1_2", ESpecialTaskCategory::Normalizer, "1", 2), + std::make_shared<TSimpleRequest>("N_2_2", ESpecialTaskCategory::Normalizer, "2", 2), + std::make_shared<TSimpleRequest>("N_3_2", ESpecialTaskCategory::Normalizer, "3", 2) + }; + } + + public: + }; + Y_UNIT_TEST(TestUniformDistribution) { + TTestingExecutorUniformDistribution().Execute(); + } +} diff --git a/ydb/core/tx/conveyor_composite/ut/ya.make b/ydb/core/tx/conveyor_composite/ut/ya.make new file mode 100644 index 00000000000..9daa068c504 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/ut/ya.make @@ -0,0 +1,30 @@ +UNITTEST_FOR(ydb/core/tx/conveyor_composite/service) + +FORK_SUBTESTS() + +SPLIT_FACTOR(60) +SIZE(MEDIUM) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/base + ydb/core/tablet + ydb/core/tablet_flat + ydb/core/tx/columnshard/counters + yql/essentials/sql/pg_dummy + yql/essentials/core/arrow_kernels/request + ydb/core/testlib/default + ydb/core/tx/columnshard/test_helper + ydb/core/tx/columnshard/hooks/abstract + ydb/core/tx/columnshard/hooks/testing + + yql/essentials/udfs/common/json2 +) + +YQL_LAST_ABI_VERSION() + +SRCS( + ut_simple.cpp +) + +END() diff --git a/ydb/core/tx/conveyor_composite/ya.make b/ydb/core/tx/conveyor_composite/ya.make new file mode 100644 index 00000000000..efa8055bf69 --- /dev/null +++ b/ydb/core/tx/conveyor_composite/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( +) + +PEERDIR( + ydb/core/tx/conveyor_composite/service + ydb/core/tx/conveyor_composite/usage +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/core/tx/ya.make b/ydb/core/tx/ya.make index b3fcead3c1e..544315c8e56 100644 --- a/ydb/core/tx/ya.make +++ b/ydb/core/tx/ya.make @@ -24,6 +24,8 @@ END() RECURSE( balance_coverage + conveyor + conveyor_composite columnshard coordinator datashard |