diff options
author | ivanmorozov <[email protected]> | 2023-07-13 22:17:32 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-07-13 22:17:32 +0300 |
commit | 8e53f4da5c476d248b24735d4273dc98b2b13878 (patch) | |
tree | 0419e4947bd4d58c58fea9ef437d8d9c6dd57f03 | |
parent | 7b0e247e7b3c2dcd2ecb25060f4aa7eba43fec74 (diff) |
KIKIMR-18741:separate conveyor for control cpu usage
-rw-r--r-- | ydb/core/driver_lib/run/config.h | 3 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 39 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.h | 10 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 8 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 4 | ||||
-rw-r--r-- | ydb/core/protos/console_config.proto | 4 | ||||
-rw-r--r-- | ydb/core/testlib/common_helper.h | 9 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/compaction_actor.cpp | 109 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/indexing_actor.cpp | 73 | ||||
-rw-r--r-- | ydb/core/tx/conveyor/service/service.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/conveyor/service/service.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/conveyor/usage/config.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/conveyor/usage/config.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/conveyor/usage/service.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/conveyor/usage/service.h | 52 |
17 files changed, 263 insertions, 102 deletions
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index 10467bd824a..75bfa640c46 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -70,7 +70,8 @@ union TBasicKikimrServicesMask { bool EnableReplicationService:1; bool EnableBackgroundTasks:1; bool EnableExternalIndex: 1; - bool EnableConveyor: 1; + bool EnableScanConveyor : 1; + bool EnableCompConveyor : 1; bool EnableLocalPgWire:1; bool EnableKafkaProxy:1; }; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 8c158728d7f..e6805b6dda3 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2113,24 +2113,49 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu } } -TConveyorInitializer::TConveyorInitializer(const TKikimrRunConfig& runConfig) +TCompConveyorInitializer::TCompConveyorInitializer(const TKikimrRunConfig& runConfig) : IKikimrServicesInitializer(runConfig) { } -void TConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { +void TCompConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { NConveyor::TConfig serviceConfig; - if (Config.HasConveyorConfig()) { - Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetConveyorConfig())); + if (Config.HasCompConveyorConfig()) { + Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetCompConveyorConfig())); + if (!serviceConfig.HasDefaultFractionOfThreadsCount()) { + serviceConfig.SetDefaultFractionOfThreadsCount(0.33); + } + } + + if (serviceConfig.IsEnabled()) { + TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets"); + TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_COMP_CONVEYOR"); + + auto service = NConveyor::TCompServiceOperator::CreateService(serviceConfig, conveyorGroup); + + setup->LocalServices.push_back(std::make_pair( + NConveyor::TCompServiceOperator::MakeServiceId(NodeId), + TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId))); + } +} + +TScanConveyorInitializer::TScanConveyorInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) { +} + +void TScanConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + NConveyor::TConfig serviceConfig; + if (Config.HasScanConveyorConfig()) { + Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetScanConveyorConfig())); } if (serviceConfig.IsEnabled()) { TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets"); - TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_CONVEYOR"); + TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_SCAN_CONVEYOR"); - auto service = NConveyor::CreateService(serviceConfig, conveyorGroup); + auto service = NConveyor::TScanServiceOperator::CreateService(serviceConfig, conveyorGroup); setup->LocalServices.push_back(std::make_pair( - NConveyor::MakeServiceId(NodeId), + NConveyor::TScanServiceOperator::MakeServiceId(NodeId), TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId))); } } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index ad5aebe6dda..1b97906e253 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -389,9 +389,15 @@ private: IGlobalObjectStorage& GlobalObjects; }; -class TConveyorInitializer: public IKikimrServicesInitializer { +class TCompConveyorInitializer: public IKikimrServicesInitializer { public: - TConveyorInitializer(const TKikimrRunConfig& runConfig); + TCompConveyorInitializer(const TKikimrRunConfig& runConfig); + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + +class TScanConveyorInitializer: public IKikimrServicesInitializer { +public: + TScanConveyorInitializer(const TKikimrRunConfig& runConfig); void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; }; diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index e2c31c82619..2c0c57bc649 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1513,8 +1513,12 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig)); } - if (serviceMask.EnableConveyor) { - sil->AddServiceInitializer(new TConveyorInitializer(runConfig)); + if (serviceMask.EnableScanConveyor) { + sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig)); + } + + if (serviceMask.EnableCompConveyor) { + sil->AddServiceInitializer(new TCompConveyorInitializer(runConfig)); } if (serviceMask.EnableBackgroundTasks) { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index dac6cd53296..6f7dc30b23a 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -610,6 +610,7 @@ message TConveyorConfig { optional bool Enabled = 1 [default = true]; optional uint32 WorkersCount = 2; optional uint32 QueueSizeLimit = 3; + optional double DefaultFractionOfThreadsCount = 4; } message TExternalIndexConfig { @@ -1927,11 +1928,12 @@ message TAppConfig { optional TClientCertificateAuthorization ClientCertificateAuthorization = 62; optional TExternalIndexConfig ExternalIndexConfig = 63; optional bool YamlConfigEnabled = 64; - optional TConveyorConfig ConveyorConfig = 65; + optional TConveyorConfig ScanConveyorConfig = 65; optional TColumnShardConfig ColumnShardConfig = 66; optional TLocalPgWireConfig LocalPgWireConfig = 69; optional TAwsCompatibilityConfig AwsCompatibilityConfig = 70; optional TKafkaProxyConfig KafkaProxyConfig = 71; + optional TConveyorConfig CompConveyorConfig = 72; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto index 900978eadb1..3b37c9763ca 100644 --- a/ydb/core/protos/console_config.proto +++ b/ydb/core/protos/console_config.proto @@ -121,9 +121,11 @@ message TConfigItem { BackgroundTasksConfigItem = 60; ExternalIndexConfigItem = 63; YamlConfigEnabledItem = 64; - ConveyorConfigItem = 65; + ScanConveyorConfigItem = 65; ColumnShardConfigItem = 66; AwsCompatibilityConfigItem = 70; + KafkaProxyConfig = 71; + CompConveyorConfigItem = 72; NamedConfigsItem = 100; ClusterYamlConfigItem = 101; diff --git a/ydb/core/testlib/common_helper.h b/ydb/core/testlib/common_helper.h index 34175c3c7d0..d8b7c55b044 100644 --- a/ydb/core/testlib/common_helper.h +++ b/ydb/core/testlib/common_helper.h @@ -29,14 +29,17 @@ public: ~TLoggerInit() { Initialize(); } - void SetComponents(const std::vector<NKikimrServices::EServiceKikimr> services) { + TLoggerInit& SetComponents(const std::vector<NKikimrServices::EServiceKikimr> services) { Services = { services }; + return *this; } - void AddComponents(const std::vector<NKikimrServices::EServiceKikimr> services) { + TLoggerInit& AddComponents(const std::vector<NKikimrServices::EServiceKikimr> services) { Services.emplace_back(services); + return *this; } - void SetPriority(const NActors::NLog::EPriority priority) { + TLoggerInit& SetPriority(const NActors::NLog::EPriority priority) { Priority = priority; + return *this; } }; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 0e7c570ad83..1538aef1169 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -734,12 +734,12 @@ namespace Tests { } if (Settings->IsEnableMetadataProvider()) { auto* actor = NMetadata::NProvider::CreateService(NMetadata::NProvider::TConfig()); - const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NMetadata::NProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } if (Settings->IsEnableBackgroundTasks()) { auto* actor = NBackgroundTasks::CreateService(NBackgroundTasks::TConfig()); - const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NBackgroundTasks::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } if (Settings->IsEnableExternalIndex()) { @@ -748,9 +748,14 @@ namespace Tests { Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } { - auto* actor = NConveyor::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters()); + auto* actor = NConveyor::TScanServiceOperator::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters()); + const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); + Runtime->RegisterService(NConveyor::TScanServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); + } + { + auto* actor = NConveyor::TCompServiceOperator::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters()); const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); - Runtime->RegisterService(NConveyor::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); + Runtime->RegisterService(NConveyor::TCompServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index e63df783dae..055835a161d 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -57,7 +57,7 @@ private: const TActorIdentity OwnerActorId; protected: virtual bool DoAdd(IDataTasksProcessor::ITask::TPtr task) override { - OwnerActorId.Send(NConveyor::MakeServiceId(OwnerActorId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task)); + OwnerActorId.Send(NConveyor::TScanServiceOperator::MakeServiceId(OwnerActorId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task)); return true; } public: @@ -119,7 +119,7 @@ public: private: IDataTasksProcessor::TPtr MakeTasksProcessor() const { - if (NConveyor::TServiceOperator::IsEnabled()) { + if (NConveyor::TScanServiceOperator::IsEnabled()) { return std::make_shared<TLocalDataTasksProcessor>(SelfId()); } else { return nullptr; diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index f7217a399b8..306cb9dc865 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -3,6 +3,8 @@ #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> #include <ydb/core/tx/columnshard/engines/index_logic_logs.h> +#include <ydb/core/tx/conveyor/usage/events.h> +#include <ydb/core/tx/conveyor/usage/service.h> using NKikimr::NOlap::TBlobRange; @@ -24,7 +26,7 @@ public: , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) { } - void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& /*ctx*/) { + void Handle(TEvPrivate::TEvCompaction::TPtr& ev) { Y_VERIFY(!TxEvent); Y_VERIFY(Blobs.empty() && !NumRead); LastActivationTime = TAppData::TimeProvider->Now(); @@ -50,7 +52,7 @@ public: } } - void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) { + void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) { LOG_S_TRACE("TEvReadBlobRangeResult (got " << NumRead << " of " << Blobs.size() << ") at tablet " << TabletId << " (compaction)"); @@ -84,20 +86,21 @@ public: } if (++NumRead == Blobs.size()) { - CompactGranules(ctx); + CompactGranules(); Clear(); } } - void Bootstrap(const TActorContext& ctx) { - Y_UNUSED(ctx); + void Bootstrap() { Become(&TThis::StateWait); } STFUNC(StateWait) { + TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)); switch (ev->GetTypeRewrite()) { - HFunc(TEvPrivate::TEvCompaction, Handle); - HFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle); + hFunc(TEvPrivate::TEvCompaction, Handle); + hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle); + hFunc(NConveyor::TEvExecution::TEvTaskProcessedResult, Handle) default: break; } @@ -134,33 +137,65 @@ private: Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), std::move(readOpts))); } - void CompactGranules(const TActorContext& ctx) { + class TConveyorTask: public NConveyor::ITask { + private: + std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent; + const TIndexationCounters Counters; + protected: + virtual bool DoExecute() override { + TCpuGuard guard(TxEvent->ResourceUsage); + + NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters); + TxEvent->Blobs = std::move(compactionLogic.Apply(TxEvent->IndexChanges).DetachResult()); + return true; + } + public: + std::unique_ptr<TEvPrivate::TEvWriteIndex> ExtractEvent() { + Y_VERIFY(TxEvent); + return std::move(TxEvent); + } + + TConveyorTask(std::unique_ptr<TEvPrivate::TEvWriteIndex>&& txEvent, const TIndexationCounters& counters) + : TxEvent(std::move(txEvent)) + , Counters(counters) + { + Y_VERIFY(TxEvent); + } + }; + + void CompactGranules() { Y_VERIFY(TxEvent); if (TxEvent->GetPutStatus() != NKikimrProto::EReplyStatus::UNKNOWN) { LOG_S_INFO("Granules compaction not started at tablet " << TabletId); - ctx.Send(Parent, TxEvent.release()); + Send(Parent, TxEvent.release()); return; } - LOG_S_DEBUG("Granules compaction started at tablet " << TabletId); + TxEvent->IndexChanges->SetBlobs(std::move(Blobs)); { - TCpuGuard guard(TxEvent->ResourceUsage); + std::shared_ptr<TConveyorTask> task = std::make_shared<TConveyorTask>(std::move(TxEvent), GetCurrentCounters()); + NConveyor::TCompServiceOperator::SendTaskToExecute(task); + } + } - TxEvent->IndexChanges->SetBlobs(std::move(Blobs)); - { - NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, GetCurrentCounters()); - TxEvent->Blobs = std::move(compactionLogic.Apply(TxEvent->IndexChanges).DetachResult()); - } - if (TxEvent->Blobs.empty()) { - TxEvent->SetPutStatus(NKikimrProto::OK); // nothing to write, commit + void Handle(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) { + auto t = static_pointer_cast<TConveyorTask>(ev->Get()->GetResult()); + Y_VERIFY_DEBUG(dynamic_pointer_cast<TConveyorTask>(ev->Get()->GetResult())); + auto txEvent = t->ExtractEvent(); + if (t->HasError()) { + ACFL_ERROR("event", "task_error")("message", ev->Get()->GetErrorMessage()); + txEvent->SetPutStatus(NKikimrProto::ERROR); + Send(Parent, txEvent.release()); + } else { + if (txEvent->Blobs.empty()) { + txEvent->SetPutStatus(NKikimrProto::OK); // nothing to write, commit } - } - TxEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime; - ui32 blobsSize = TxEvent->Blobs.size(); - ctx.Send(Parent, TxEvent.release()); + txEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime; + ui32 blobsSize = txEvent->Blobs.size(); + Send(Parent, txEvent.release()); - LOG_S_DEBUG("Granules compaction finished (" << blobsSize << " new blobs) at tablet " << TabletId); - // Die(ctx); // It's alive till tablet's death + LOG_S_DEBUG("Granules compaction finished (" << blobsSize << " new blobs) at tablet " << TabletId); + } } }; @@ -173,39 +208,39 @@ public: { } - void Bootstrap(const TActorContext& ctx) { + void Bootstrap() { Become(&TThis::StateWait); for (auto& worker : Idle) { - worker = ctx.Register(new TCompactionActor(TabletId, ctx.SelfID)); + worker = Register(new TCompactionActor(TabletId, SelfId())); } } STFUNC(StateWait) { switch (ev->GetTypeRewrite()) { - HFunc(TEvents::TEvPoisonPill, Handle); - HFunc(TEvPrivate::TEvWriteIndex, Handle); - HFunc(TEvPrivate::TEvCompaction, Handle); + hFunc(TEvents::TEvPoisonPill, Handle); + hFunc(TEvPrivate::TEvWriteIndex, Handle); + hFunc(TEvPrivate::TEvCompaction, Handle); default: break; } } private: - void Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) { + void Handle(TEvents::TEvPoisonPill::TPtr&) { for (const auto& worker : Active) { - ctx.Send(worker, new TEvents::TEvPoisonPill); + Send(worker, new TEvents::TEvPoisonPill); } for (const auto& worker : Idle) { - ctx.Send(worker, new TEvents::TEvPoisonPill); + Send(worker, new TEvents::TEvPoisonPill); } - Die(ctx); + PassAway(); } - void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev) { if (auto ai = Active.find(ev->Sender); ai != Active.end()) { - ctx.Send(ev->Forward(Parent)); + Send(ev->Forward(Parent)); Idle.push_back(*ai); Active.erase(ai); } else { @@ -213,10 +248,10 @@ private: } } - void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvPrivate::TEvCompaction::TPtr& ev) { Y_VERIFY(!Idle.empty()); - ctx.Send(ev->Forward(Idle.back())); + Send(ev->Forward(Idle.back())); Active.insert(Idle.back()); Idle.pop_back(); diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index 5d48679f002..f260225b840 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -1,7 +1,9 @@ +#include "blob_cache.h" #include "columnshard_impl.h" #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> #include <ydb/core/tx/columnshard/engines/index_logic_logs.h> -#include "blob_cache.h" +#include <ydb/core/tx/conveyor/usage/events.h> +#include <ydb/core/tx/conveyor/usage/service.h> namespace NKikimr::NColumnShard { namespace { @@ -21,7 +23,7 @@ public: , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) {} - void Handle(TEvPrivate::TEvIndexing::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvPrivate::TEvIndexing::TPtr& ev) { LOG_S_DEBUG("TEvIndexing at tablet " << TabletId << " (index)"); LastActivationTime = TAppData::TimeProvider->Now(); @@ -46,11 +48,11 @@ public: } if (BlobsToRead.empty()) { - Index(ctx); + Index(); } } - void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) { + void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) { LOG_S_TRACE("TEvReadBlobRangeResult (waiting " << BlobsToRead.size() << ") at tablet " << TabletId << " (index)"); @@ -87,19 +89,20 @@ public: indexChanges->Blobs[event.BlobRange] = blobData; if (BlobsToRead.empty()) { - Index(ctx); + Index(); } } - void Bootstrap(const TActorContext& ctx) { - Y_UNUSED(ctx); + void Bootstrap() { Become(&TThis::StateWait); } STFUNC(StateWait) { + TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)); switch (ev->GetTypeRewrite()) { - HFunc(TEvPrivate::TEvIndexing, Handle); - HFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle); + hFunc(TEvPrivate::TEvIndexing, Handle); + hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle); + hFunc(NConveyor::TEvExecution::TEvTaskProcessedResult, Handle) default: break; } @@ -124,22 +127,56 @@ private: Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts))); } - void Index(const TActorContext& ctx) { - Y_VERIFY(TxEvent); - if (TxEvent->GetPutStatus() == NKikimrProto::UNKNOWN) { - LOG_S_DEBUG("Indexing started at tablet " << TabletId); - + class TConveyorTask: public NConveyor::ITask { + private: + std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent; + const TIndexationCounters Counters; + protected: + virtual bool DoExecute() override { TCpuGuard guard(TxEvent->ResourceUsage); + NOlap::TIndexationLogic indexationLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters); TxEvent->Blobs = std::move(indexationLogic.Apply(TxEvent->IndexChanges).DetachResult()); - LOG_S_DEBUG("Indexing finished at tablet " << TabletId); + return true; + } + public: + std::unique_ptr<TEvPrivate::TEvWriteIndex> ExtractEvent() { + Y_VERIFY(TxEvent); + return std::move(TxEvent); + } + + TConveyorTask(std::unique_ptr<TEvPrivate::TEvWriteIndex>&& txEvent, const TIndexationCounters& counters) + : TxEvent(std::move(txEvent)) + , Counters(counters) + { + Y_VERIFY(TxEvent); + } + }; + + void Index() { + Y_VERIFY(TxEvent); + if (TxEvent->GetPutStatus() == NKikimrProto::UNKNOWN) { + LOG_S_DEBUG("Indexing started at tablet " << TabletId); + std::shared_ptr<TConveyorTask> task = std::make_shared<TConveyorTask>(std::move(TxEvent), Counters); + NConveyor::TCompServiceOperator::SendTaskToExecute(task); } else { LOG_S_ERROR("Indexing failed at tablet " << TabletId); + TxEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime; + Send(Parent, TxEvent.release()); } + } - TxEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime; - ctx.Send(Parent, TxEvent.release()); - //Die(ctx); // It's alive till tablet's death + void Handle(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) { + auto t = static_pointer_cast<TConveyorTask>(ev->Get()->GetResult()); + Y_VERIFY_DEBUG(dynamic_pointer_cast<TConveyorTask>(ev->Get()->GetResult())); + auto txEvent = t->ExtractEvent(); + if (t->HasError()) { + ACFL_ERROR("event", "task_error")("message", t->GetErrorMessage()); + } else { + LOG_S_DEBUG("Indexing finished at tablet " << TabletId); + } + txEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime; + Send(Parent, txEvent.release()); } }; diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp index 9d65cfb7387..12a4815f4ca 100644 --- a/ydb/core/tx/conveyor/service/service.cpp +++ b/ydb/core/tx/conveyor/service/service.cpp @@ -4,10 +4,6 @@ namespace NKikimr::NConveyor { -NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) { - return new TDistributor(config, "common", conveyorSignals); -} - TDistributor::TDistributor(const TConfig& config, const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) : Config(config) , ConveyorName(conveyorName) @@ -17,9 +13,8 @@ TDistributor::TDistributor(const TConfig& config, const TString& conveyorName, T } void TDistributor::Bootstrap() { - const ui32 workersCount = Config.GetWorkersCountDef(NKqp::TStagePredictor::GetUsableThreads()); + const ui32 workersCount = Config.GetWorkersCountForConveyor(NKqp::TStagePredictor::GetUsableThreads()); ALS_NOTICE(NKikimrServices::TX_CONVEYOR) << "action=conveyor_registered;actor_id=" << SelfId() << ";workers_count=" << workersCount << ";limit=" << Config.GetQueueSizeLimit(); - TServiceOperator::Register(Config); for (ui32 i = 0; i < workersCount; ++i) { Workers.emplace_back(Register(new TWorker())); } diff --git a/ydb/core/tx/conveyor/service/service.h b/ydb/core/tx/conveyor/service/service.h index 7bfa88d9708..0ab8a9ea838 100644 --- a/ydb/core/tx/conveyor/service/service.h +++ b/ydb/core/tx/conveyor/service/service.h @@ -70,6 +70,4 @@ public: void Bootstrap(); }; -NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals); - } diff --git a/ydb/core/tx/conveyor/usage/config.cpp b/ydb/core/tx/conveyor/usage/config.cpp index b73683cd2ac..39ae3803582 100644 --- a/ydb/core/tx/conveyor/usage/config.cpp +++ b/ydb/core/tx/conveyor/usage/config.cpp @@ -14,6 +14,20 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config) if (config.HasWorkersCount()) { WorkersCount = config.GetWorkersCount(); } + if (config.HasDefaultFractionOfThreadsCount()) { + DefaultFractionOfThreadsCount = config.GetDefaultFractionOfThreadsCount(); + } return true; } + +ui32 TConfig::GetWorkersCountForConveyor(const ui32 poolThreadsCount) const { + if (WorkersCount) { + return *WorkersCount; + } else if (DefaultFractionOfThreadsCount) { + return Max<ui32>(1, *DefaultFractionOfThreadsCount * poolThreadsCount); + } else { + return poolThreadsCount; + } +} + } diff --git a/ydb/core/tx/conveyor/usage/config.h b/ydb/core/tx/conveyor/usage/config.h index dbce08107ce..45d78d0cebd 100644 --- a/ydb/core/tx/conveyor/usage/config.h +++ b/ydb/core/tx/conveyor/usage/config.h @@ -9,8 +9,10 @@ private: YDB_OPT(ui32, WorkersCount); YDB_READONLY(ui32, QueueSizeLimit, 256 * 1024); YDB_READONLY_FLAG(Enabled, true); + YDB_OPT(double, DefaultFractionOfThreadsCount); public: bool DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config); + ui32 GetWorkersCountForConveyor(const ui32 poolThreadsCount) const; }; } diff --git a/ydb/core/tx/conveyor/usage/service.cpp b/ydb/core/tx/conveyor/usage/service.cpp index 71416b06880..c4e435d65f4 100644 --- a/ydb/core/tx/conveyor/usage/service.cpp +++ b/ydb/core/tx/conveyor/usage/service.cpp @@ -2,16 +2,4 @@ namespace NKikimr::NConveyor { -bool TServiceOperator::IsEnabled() { - return Singleton<TServiceOperator>()->IsEnabledFlag; -} - -void TServiceOperator::Register(const TConfig& serviceConfig) { - Singleton<TServiceOperator>()->IsEnabledFlag = serviceConfig.IsEnabled(); -} - -NActors::TActorId MakeServiceId(const ui32 nodeId) { - return NActors::TActorId(nodeId, "SrvcConveyor"); -} - } diff --git a/ydb/core/tx/conveyor/usage/service.h b/ydb/core/tx/conveyor/usage/service.h index 91d0d797fe4..e015e2538e9 100644 --- a/ydb/core/tx/conveyor/usage/service.h +++ b/ydb/core/tx/conveyor/usage/service.h @@ -1,17 +1,61 @@ #pragma once #include "config.h" #include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/actor.h> +#include <ydb/core/tx/conveyor/service/service.h> +#include <ydb/core/tx/conveyor/usage/events.h> namespace NKikimr::NConveyor { -class TServiceOperator { +template <class TConveyorPolicy> +class TServiceOperatorImpl { private: + using TSelf = TServiceOperatorImpl<TConveyorPolicy>; std::atomic<bool> IsEnabledFlag = false; + static void Register(const TConfig& serviceConfig) { + Singleton<TSelf>()->IsEnabledFlag = serviceConfig.IsEnabled(); + } + static const TString& GetConveyorName() { + Y_VERIFY(TConveyorPolicy::Name.size() == 4); + return TConveyorPolicy::Name; + } public: - static bool IsEnabled(); - static void Register(const TConfig& serviceConfig); + static bool SendTaskToExecute(const std::shared_ptr<ITask>& task) { + auto& context = NActors::TActorContext::AsActorContext(); + const NActors::TActorId& selfId = context.SelfID; + if (TSelf::IsEnabled()) { + context.Send(MakeServiceId(selfId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task)); + return true; + } else { + task->Execute(); + context.Send(selfId, new NConveyor::TEvExecution::TEvTaskProcessedResult(task)); + return false; + } + } + static bool IsEnabled() { + return Singleton<TSelf>()->IsEnabledFlag; + } + static NActors::TActorId MakeServiceId(const ui32 nodeId) { + return NActors::TActorId(nodeId, "SrvcConv" + GetConveyorName()); + } + static NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) { + Register(config); + return new TDistributor(config, GetConveyorName(), conveyorSignals); + } + +}; + +class TScanConveyorPolicy { +public: + static const inline TString Name = "Scan"; +}; + +class TCompConveyorPolicy { +public: + static const inline TString Name = "Comp"; }; -NActors::TActorId MakeServiceId(const ui32 nodeId); +using TScanServiceOperator = TServiceOperatorImpl<TScanConveyorPolicy>; +using TCompServiceOperator = TServiceOperatorImpl<TCompConveyorPolicy>; } |