summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-07-13 22:17:32 +0300
committerivanmorozov <[email protected]>2023-07-13 22:17:32 +0300
commit8e53f4da5c476d248b24735d4273dc98b2b13878 (patch)
tree0419e4947bd4d58c58fea9ef437d8d9c6dd57f03
parent7b0e247e7b3c2dcd2ecb25060f4aa7eba43fec74 (diff)
KIKIMR-18741:separate conveyor for control cpu usage
-rw-r--r--ydb/core/driver_lib/run/config.h3
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp39
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h10
-rw-r--r--ydb/core/driver_lib/run/run.cpp8
-rw-r--r--ydb/core/protos/config.proto4
-rw-r--r--ydb/core/protos/console_config.proto4
-rw-r--r--ydb/core/testlib/common_helper.h9
-rw-r--r--ydb/core/testlib/test_client.cpp13
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp4
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp109
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp73
-rw-r--r--ydb/core/tx/conveyor/service/service.cpp7
-rw-r--r--ydb/core/tx/conveyor/service/service.h2
-rw-r--r--ydb/core/tx/conveyor/usage/config.cpp14
-rw-r--r--ydb/core/tx/conveyor/usage/config.h2
-rw-r--r--ydb/core/tx/conveyor/usage/service.cpp12
-rw-r--r--ydb/core/tx/conveyor/usage/service.h52
17 files changed, 263 insertions, 102 deletions
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h
index 10467bd824a..75bfa640c46 100644
--- a/ydb/core/driver_lib/run/config.h
+++ b/ydb/core/driver_lib/run/config.h
@@ -70,7 +70,8 @@ union TBasicKikimrServicesMask {
bool EnableReplicationService:1;
bool EnableBackgroundTasks:1;
bool EnableExternalIndex: 1;
- bool EnableConveyor: 1;
+ bool EnableScanConveyor : 1;
+ bool EnableCompConveyor : 1;
bool EnableLocalPgWire:1;
bool EnableKafkaProxy:1;
};
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 8c158728d7f..e6805b6dda3 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2113,24 +2113,49 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
}
}
-TConveyorInitializer::TConveyorInitializer(const TKikimrRunConfig& runConfig)
+TCompConveyorInitializer::TCompConveyorInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}
-void TConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+void TCompConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
NConveyor::TConfig serviceConfig;
- if (Config.HasConveyorConfig()) {
- Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetConveyorConfig()));
+ if (Config.HasCompConveyorConfig()) {
+ Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetCompConveyorConfig()));
+ if (!serviceConfig.HasDefaultFractionOfThreadsCount()) {
+ serviceConfig.SetDefaultFractionOfThreadsCount(0.33);
+ }
+ }
+
+ if (serviceConfig.IsEnabled()) {
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_COMP_CONVEYOR");
+
+ auto service = NConveyor::TCompServiceOperator::CreateService(serviceConfig, conveyorGroup);
+
+ setup->LocalServices.push_back(std::make_pair(
+ NConveyor::TCompServiceOperator::MakeServiceId(NodeId),
+ TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
+ }
+}
+
+TScanConveyorInitializer::TScanConveyorInitializer(const TKikimrRunConfig& runConfig)
+ : IKikimrServicesInitializer(runConfig) {
+}
+
+void TScanConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+ NConveyor::TConfig serviceConfig;
+ if (Config.HasScanConveyorConfig()) {
+ Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetScanConveyorConfig()));
}
if (serviceConfig.IsEnabled()) {
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
- TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_CONVEYOR");
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_SCAN_CONVEYOR");
- auto service = NConveyor::CreateService(serviceConfig, conveyorGroup);
+ auto service = NConveyor::TScanServiceOperator::CreateService(serviceConfig, conveyorGroup);
setup->LocalServices.push_back(std::make_pair(
- NConveyor::MakeServiceId(NodeId),
+ NConveyor::TScanServiceOperator::MakeServiceId(NodeId),
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
}
}
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index ad5aebe6dda..1b97906e253 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -389,9 +389,15 @@ private:
IGlobalObjectStorage& GlobalObjects;
};
-class TConveyorInitializer: public IKikimrServicesInitializer {
+class TCompConveyorInitializer: public IKikimrServicesInitializer {
public:
- TConveyorInitializer(const TKikimrRunConfig& runConfig);
+ TCompConveyorInitializer(const TKikimrRunConfig& runConfig);
+ void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
+class TScanConveyorInitializer: public IKikimrServicesInitializer {
+public:
+ TScanConveyorInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index e2c31c82619..2c0c57bc649 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1513,8 +1513,12 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig));
}
- if (serviceMask.EnableConveyor) {
- sil->AddServiceInitializer(new TConveyorInitializer(runConfig));
+ if (serviceMask.EnableScanConveyor) {
+ sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
+ }
+
+ if (serviceMask.EnableCompConveyor) {
+ sil->AddServiceInitializer(new TCompConveyorInitializer(runConfig));
}
if (serviceMask.EnableBackgroundTasks) {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index dac6cd53296..6f7dc30b23a 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -610,6 +610,7 @@ message TConveyorConfig {
optional bool Enabled = 1 [default = true];
optional uint32 WorkersCount = 2;
optional uint32 QueueSizeLimit = 3;
+ optional double DefaultFractionOfThreadsCount = 4;
}
message TExternalIndexConfig {
@@ -1927,11 +1928,12 @@ message TAppConfig {
optional TClientCertificateAuthorization ClientCertificateAuthorization = 62;
optional TExternalIndexConfig ExternalIndexConfig = 63;
optional bool YamlConfigEnabled = 64;
- optional TConveyorConfig ConveyorConfig = 65;
+ optional TConveyorConfig ScanConveyorConfig = 65;
optional TColumnShardConfig ColumnShardConfig = 66;
optional TLocalPgWireConfig LocalPgWireConfig = 69;
optional TAwsCompatibilityConfig AwsCompatibilityConfig = 70;
optional TKafkaProxyConfig KafkaProxyConfig = 71;
+ optional TConveyorConfig CompConveyorConfig = 72;
repeated TNamedConfig NamedConfigs = 100;
optional string ClusterYamlConfig = 101;
diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto
index 900978eadb1..3b37c9763ca 100644
--- a/ydb/core/protos/console_config.proto
+++ b/ydb/core/protos/console_config.proto
@@ -121,9 +121,11 @@ message TConfigItem {
BackgroundTasksConfigItem = 60;
ExternalIndexConfigItem = 63;
YamlConfigEnabledItem = 64;
- ConveyorConfigItem = 65;
+ ScanConveyorConfigItem = 65;
ColumnShardConfigItem = 66;
AwsCompatibilityConfigItem = 70;
+ KafkaProxyConfig = 71;
+ CompConveyorConfigItem = 72;
NamedConfigsItem = 100;
ClusterYamlConfigItem = 101;
diff --git a/ydb/core/testlib/common_helper.h b/ydb/core/testlib/common_helper.h
index 34175c3c7d0..d8b7c55b044 100644
--- a/ydb/core/testlib/common_helper.h
+++ b/ydb/core/testlib/common_helper.h
@@ -29,14 +29,17 @@ public:
~TLoggerInit() {
Initialize();
}
- void SetComponents(const std::vector<NKikimrServices::EServiceKikimr> services) {
+ TLoggerInit& SetComponents(const std::vector<NKikimrServices::EServiceKikimr> services) {
Services = { services };
+ return *this;
}
- void AddComponents(const std::vector<NKikimrServices::EServiceKikimr> services) {
+ TLoggerInit& AddComponents(const std::vector<NKikimrServices::EServiceKikimr> services) {
Services.emplace_back(services);
+ return *this;
}
- void SetPriority(const NActors::NLog::EPriority priority) {
+ TLoggerInit& SetPriority(const NActors::NLog::EPriority priority) {
Priority = priority;
+ return *this;
}
};
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 0e7c570ad83..1538aef1169 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -734,12 +734,12 @@ namespace Tests {
}
if (Settings->IsEnableMetadataProvider()) {
auto* actor = NMetadata::NProvider::CreateService(NMetadata::NProvider::TConfig());
- const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
+ const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NMetadata::NProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
if (Settings->IsEnableBackgroundTasks()) {
auto* actor = NBackgroundTasks::CreateService(NBackgroundTasks::TConfig());
- const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
+ const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NBackgroundTasks::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
if (Settings->IsEnableExternalIndex()) {
@@ -748,9 +748,14 @@ namespace Tests {
Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
{
- auto* actor = NConveyor::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters());
+ auto* actor = NConveyor::TScanServiceOperator::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters());
+ const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
+ Runtime->RegisterService(NConveyor::TScanServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
+ }
+ {
+ auto* actor = NConveyor::TCompServiceOperator::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters());
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
- Runtime->RegisterService(NConveyor::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
+ Runtime->RegisterService(NConveyor::TCompServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index e63df783dae..055835a161d 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -57,7 +57,7 @@ private:
const TActorIdentity OwnerActorId;
protected:
virtual bool DoAdd(IDataTasksProcessor::ITask::TPtr task) override {
- OwnerActorId.Send(NConveyor::MakeServiceId(OwnerActorId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task));
+ OwnerActorId.Send(NConveyor::TScanServiceOperator::MakeServiceId(OwnerActorId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task));
return true;
}
public:
@@ -119,7 +119,7 @@ public:
private:
IDataTasksProcessor::TPtr MakeTasksProcessor() const {
- if (NConveyor::TServiceOperator::IsEnabled()) {
+ if (NConveyor::TScanServiceOperator::IsEnabled()) {
return std::make_shared<TLocalDataTasksProcessor>(SelfId());
} else {
return nullptr;
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index f7217a399b8..306cb9dc865 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -3,6 +3,8 @@
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/engines/index_logic_logs.h>
+#include <ydb/core/tx/conveyor/usage/events.h>
+#include <ydb/core/tx/conveyor/usage/service.h>
using NKikimr::NOlap::TBlobRange;
@@ -24,7 +26,7 @@ public:
, BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) {
}
- void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& /*ctx*/) {
+ void Handle(TEvPrivate::TEvCompaction::TPtr& ev) {
Y_VERIFY(!TxEvent);
Y_VERIFY(Blobs.empty() && !NumRead);
LastActivationTime = TAppData::TimeProvider->Now();
@@ -50,7 +52,7 @@ public:
}
}
- void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) {
+ void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) {
LOG_S_TRACE("TEvReadBlobRangeResult (got " << NumRead << " of " << Blobs.size() << ") at tablet " << TabletId
<< " (compaction)");
@@ -84,20 +86,21 @@ public:
}
if (++NumRead == Blobs.size()) {
- CompactGranules(ctx);
+ CompactGranules();
Clear();
}
}
- void Bootstrap(const TActorContext& ctx) {
- Y_UNUSED(ctx);
+ void Bootstrap() {
Become(&TThis::StateWait);
}
STFUNC(StateWait) {
+ TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId));
switch (ev->GetTypeRewrite()) {
- HFunc(TEvPrivate::TEvCompaction, Handle);
- HFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle);
+ hFunc(TEvPrivate::TEvCompaction, Handle);
+ hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle);
+ hFunc(NConveyor::TEvExecution::TEvTaskProcessedResult, Handle)
default:
break;
}
@@ -134,33 +137,65 @@ private:
Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRangeBatch(std::move(ranges), std::move(readOpts)));
}
- void CompactGranules(const TActorContext& ctx) {
+ class TConveyorTask: public NConveyor::ITask {
+ private:
+ std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent;
+ const TIndexationCounters Counters;
+ protected:
+ virtual bool DoExecute() override {
+ TCpuGuard guard(TxEvent->ResourceUsage);
+
+ NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
+ TxEvent->Blobs = std::move(compactionLogic.Apply(TxEvent->IndexChanges).DetachResult());
+ return true;
+ }
+ public:
+ std::unique_ptr<TEvPrivate::TEvWriteIndex> ExtractEvent() {
+ Y_VERIFY(TxEvent);
+ return std::move(TxEvent);
+ }
+
+ TConveyorTask(std::unique_ptr<TEvPrivate::TEvWriteIndex>&& txEvent, const TIndexationCounters& counters)
+ : TxEvent(std::move(txEvent))
+ , Counters(counters)
+ {
+ Y_VERIFY(TxEvent);
+ }
+ };
+
+ void CompactGranules() {
Y_VERIFY(TxEvent);
if (TxEvent->GetPutStatus() != NKikimrProto::EReplyStatus::UNKNOWN) {
LOG_S_INFO("Granules compaction not started at tablet " << TabletId);
- ctx.Send(Parent, TxEvent.release());
+ Send(Parent, TxEvent.release());
return;
}
-
LOG_S_DEBUG("Granules compaction started at tablet " << TabletId);
+ TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
{
- TCpuGuard guard(TxEvent->ResourceUsage);
+ std::shared_ptr<TConveyorTask> task = std::make_shared<TConveyorTask>(std::move(TxEvent), GetCurrentCounters());
+ NConveyor::TCompServiceOperator::SendTaskToExecute(task);
+ }
+ }
- TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
- {
- NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, GetCurrentCounters());
- TxEvent->Blobs = std::move(compactionLogic.Apply(TxEvent->IndexChanges).DetachResult());
- }
- if (TxEvent->Blobs.empty()) {
- TxEvent->SetPutStatus(NKikimrProto::OK); // nothing to write, commit
+ void Handle(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
+ auto t = static_pointer_cast<TConveyorTask>(ev->Get()->GetResult());
+ Y_VERIFY_DEBUG(dynamic_pointer_cast<TConveyorTask>(ev->Get()->GetResult()));
+ auto txEvent = t->ExtractEvent();
+ if (t->HasError()) {
+ ACFL_ERROR("event", "task_error")("message", ev->Get()->GetErrorMessage());
+ txEvent->SetPutStatus(NKikimrProto::ERROR);
+ Send(Parent, txEvent.release());
+ } else {
+ if (txEvent->Blobs.empty()) {
+ txEvent->SetPutStatus(NKikimrProto::OK); // nothing to write, commit
}
- }
- TxEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime;
- ui32 blobsSize = TxEvent->Blobs.size();
- ctx.Send(Parent, TxEvent.release());
+ txEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime;
+ ui32 blobsSize = txEvent->Blobs.size();
+ Send(Parent, txEvent.release());
- LOG_S_DEBUG("Granules compaction finished (" << blobsSize << " new blobs) at tablet " << TabletId);
- // Die(ctx); // It's alive till tablet's death
+ LOG_S_DEBUG("Granules compaction finished (" << blobsSize << " new blobs) at tablet " << TabletId);
+ }
}
};
@@ -173,39 +208,39 @@ public:
{
}
- void Bootstrap(const TActorContext& ctx) {
+ void Bootstrap() {
Become(&TThis::StateWait);
for (auto& worker : Idle) {
- worker = ctx.Register(new TCompactionActor(TabletId, ctx.SelfID));
+ worker = Register(new TCompactionActor(TabletId, SelfId()));
}
}
STFUNC(StateWait) {
switch (ev->GetTypeRewrite()) {
- HFunc(TEvents::TEvPoisonPill, Handle);
- HFunc(TEvPrivate::TEvWriteIndex, Handle);
- HFunc(TEvPrivate::TEvCompaction, Handle);
+ hFunc(TEvents::TEvPoisonPill, Handle);
+ hFunc(TEvPrivate::TEvWriteIndex, Handle);
+ hFunc(TEvPrivate::TEvCompaction, Handle);
default:
break;
}
}
private:
- void Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) {
+ void Handle(TEvents::TEvPoisonPill::TPtr&) {
for (const auto& worker : Active) {
- ctx.Send(worker, new TEvents::TEvPoisonPill);
+ Send(worker, new TEvents::TEvPoisonPill);
}
for (const auto& worker : Idle) {
- ctx.Send(worker, new TEvents::TEvPoisonPill);
+ Send(worker, new TEvents::TEvPoisonPill);
}
- Die(ctx);
+ PassAway();
}
- void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev) {
if (auto ai = Active.find(ev->Sender); ai != Active.end()) {
- ctx.Send(ev->Forward(Parent));
+ Send(ev->Forward(Parent));
Idle.push_back(*ai);
Active.erase(ai);
} else {
@@ -213,10 +248,10 @@ private:
}
}
- void Handle(TEvPrivate::TEvCompaction::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvPrivate::TEvCompaction::TPtr& ev) {
Y_VERIFY(!Idle.empty());
- ctx.Send(ev->Forward(Idle.back()));
+ Send(ev->Forward(Idle.back()));
Active.insert(Idle.back());
Idle.pop_back();
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index 5d48679f002..f260225b840 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -1,7 +1,9 @@
+#include "blob_cache.h"
#include "columnshard_impl.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/engines/index_logic_logs.h>
-#include "blob_cache.h"
+#include <ydb/core/tx/conveyor/usage/events.h>
+#include <ydb/core/tx/conveyor/usage/service.h>
namespace NKikimr::NColumnShard {
namespace {
@@ -21,7 +23,7 @@ public:
, BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
{}
- void Handle(TEvPrivate::TEvIndexing::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvPrivate::TEvIndexing::TPtr& ev) {
LOG_S_DEBUG("TEvIndexing at tablet " << TabletId << " (index)");
LastActivationTime = TAppData::TimeProvider->Now();
@@ -46,11 +48,11 @@ public:
}
if (BlobsToRead.empty()) {
- Index(ctx);
+ Index();
}
}
- void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) {
+ void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) {
LOG_S_TRACE("TEvReadBlobRangeResult (waiting " << BlobsToRead.size()
<< ") at tablet " << TabletId << " (index)");
@@ -87,19 +89,20 @@ public:
indexChanges->Blobs[event.BlobRange] = blobData;
if (BlobsToRead.empty()) {
- Index(ctx);
+ Index();
}
}
- void Bootstrap(const TActorContext& ctx) {
- Y_UNUSED(ctx);
+ void Bootstrap() {
Become(&TThis::StateWait);
}
STFUNC(StateWait) {
+ TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId));
switch (ev->GetTypeRewrite()) {
- HFunc(TEvPrivate::TEvIndexing, Handle);
- HFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle);
+ hFunc(TEvPrivate::TEvIndexing, Handle);
+ hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle);
+ hFunc(NConveyor::TEvExecution::TEvTaskProcessedResult, Handle)
default:
break;
}
@@ -124,22 +127,56 @@ private:
Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, std::move(readOpts)));
}
- void Index(const TActorContext& ctx) {
- Y_VERIFY(TxEvent);
- if (TxEvent->GetPutStatus() == NKikimrProto::UNKNOWN) {
- LOG_S_DEBUG("Indexing started at tablet " << TabletId);
-
+ class TConveyorTask: public NConveyor::ITask {
+ private:
+ std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent;
+ const TIndexationCounters Counters;
+ protected:
+ virtual bool DoExecute() override {
TCpuGuard guard(TxEvent->ResourceUsage);
+
NOlap::TIndexationLogic indexationLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
TxEvent->Blobs = std::move(indexationLogic.Apply(TxEvent->IndexChanges).DetachResult());
- LOG_S_DEBUG("Indexing finished at tablet " << TabletId);
+ return true;
+ }
+ public:
+ std::unique_ptr<TEvPrivate::TEvWriteIndex> ExtractEvent() {
+ Y_VERIFY(TxEvent);
+ return std::move(TxEvent);
+ }
+
+ TConveyorTask(std::unique_ptr<TEvPrivate::TEvWriteIndex>&& txEvent, const TIndexationCounters& counters)
+ : TxEvent(std::move(txEvent))
+ , Counters(counters)
+ {
+ Y_VERIFY(TxEvent);
+ }
+ };
+
+ void Index() {
+ Y_VERIFY(TxEvent);
+ if (TxEvent->GetPutStatus() == NKikimrProto::UNKNOWN) {
+ LOG_S_DEBUG("Indexing started at tablet " << TabletId);
+ std::shared_ptr<TConveyorTask> task = std::make_shared<TConveyorTask>(std::move(TxEvent), Counters);
+ NConveyor::TCompServiceOperator::SendTaskToExecute(task);
} else {
LOG_S_ERROR("Indexing failed at tablet " << TabletId);
+ TxEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime;
+ Send(Parent, TxEvent.release());
}
+ }
- TxEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime;
- ctx.Send(Parent, TxEvent.release());
- //Die(ctx); // It's alive till tablet's death
+ void Handle(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
+ auto t = static_pointer_cast<TConveyorTask>(ev->Get()->GetResult());
+ Y_VERIFY_DEBUG(dynamic_pointer_cast<TConveyorTask>(ev->Get()->GetResult()));
+ auto txEvent = t->ExtractEvent();
+ if (t->HasError()) {
+ ACFL_ERROR("event", "task_error")("message", t->GetErrorMessage());
+ } else {
+ LOG_S_DEBUG("Indexing finished at tablet " << TabletId);
+ }
+ txEvent->Duration = TAppData::TimeProvider->Now() - LastActivationTime;
+ Send(Parent, txEvent.release());
}
};
diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp
index 9d65cfb7387..12a4815f4ca 100644
--- a/ydb/core/tx/conveyor/service/service.cpp
+++ b/ydb/core/tx/conveyor/service/service.cpp
@@ -4,10 +4,6 @@
namespace NKikimr::NConveyor {
-NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) {
- return new TDistributor(config, "common", conveyorSignals);
-}
-
TDistributor::TDistributor(const TConfig& config, const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals)
: Config(config)
, ConveyorName(conveyorName)
@@ -17,9 +13,8 @@ TDistributor::TDistributor(const TConfig& config, const TString& conveyorName, T
}
void TDistributor::Bootstrap() {
- const ui32 workersCount = Config.GetWorkersCountDef(NKqp::TStagePredictor::GetUsableThreads());
+ const ui32 workersCount = Config.GetWorkersCountForConveyor(NKqp::TStagePredictor::GetUsableThreads());
ALS_NOTICE(NKikimrServices::TX_CONVEYOR) << "action=conveyor_registered;actor_id=" << SelfId() << ";workers_count=" << workersCount << ";limit=" << Config.GetQueueSizeLimit();
- TServiceOperator::Register(Config);
for (ui32 i = 0; i < workersCount; ++i) {
Workers.emplace_back(Register(new TWorker()));
}
diff --git a/ydb/core/tx/conveyor/service/service.h b/ydb/core/tx/conveyor/service/service.h
index 7bfa88d9708..0ab8a9ea838 100644
--- a/ydb/core/tx/conveyor/service/service.h
+++ b/ydb/core/tx/conveyor/service/service.h
@@ -70,6 +70,4 @@ public:
void Bootstrap();
};
-NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals);
-
}
diff --git a/ydb/core/tx/conveyor/usage/config.cpp b/ydb/core/tx/conveyor/usage/config.cpp
index b73683cd2ac..39ae3803582 100644
--- a/ydb/core/tx/conveyor/usage/config.cpp
+++ b/ydb/core/tx/conveyor/usage/config.cpp
@@ -14,6 +14,20 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config)
if (config.HasWorkersCount()) {
WorkersCount = config.GetWorkersCount();
}
+ if (config.HasDefaultFractionOfThreadsCount()) {
+ DefaultFractionOfThreadsCount = config.GetDefaultFractionOfThreadsCount();
+ }
return true;
}
+
+ui32 TConfig::GetWorkersCountForConveyor(const ui32 poolThreadsCount) const {
+ if (WorkersCount) {
+ return *WorkersCount;
+ } else if (DefaultFractionOfThreadsCount) {
+ return Max<ui32>(1, *DefaultFractionOfThreadsCount * poolThreadsCount);
+ } else {
+ return poolThreadsCount;
+ }
+}
+
}
diff --git a/ydb/core/tx/conveyor/usage/config.h b/ydb/core/tx/conveyor/usage/config.h
index dbce08107ce..45d78d0cebd 100644
--- a/ydb/core/tx/conveyor/usage/config.h
+++ b/ydb/core/tx/conveyor/usage/config.h
@@ -9,8 +9,10 @@ private:
YDB_OPT(ui32, WorkersCount);
YDB_READONLY(ui32, QueueSizeLimit, 256 * 1024);
YDB_READONLY_FLAG(Enabled, true);
+ YDB_OPT(double, DefaultFractionOfThreadsCount);
public:
bool DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config);
+ ui32 GetWorkersCountForConveyor(const ui32 poolThreadsCount) const;
};
}
diff --git a/ydb/core/tx/conveyor/usage/service.cpp b/ydb/core/tx/conveyor/usage/service.cpp
index 71416b06880..c4e435d65f4 100644
--- a/ydb/core/tx/conveyor/usage/service.cpp
+++ b/ydb/core/tx/conveyor/usage/service.cpp
@@ -2,16 +2,4 @@
namespace NKikimr::NConveyor {
-bool TServiceOperator::IsEnabled() {
- return Singleton<TServiceOperator>()->IsEnabledFlag;
-}
-
-void TServiceOperator::Register(const TConfig& serviceConfig) {
- Singleton<TServiceOperator>()->IsEnabledFlag = serviceConfig.IsEnabled();
-}
-
-NActors::TActorId MakeServiceId(const ui32 nodeId) {
- return NActors::TActorId(nodeId, "SrvcConveyor");
-}
-
}
diff --git a/ydb/core/tx/conveyor/usage/service.h b/ydb/core/tx/conveyor/usage/service.h
index 91d0d797fe4..e015e2538e9 100644
--- a/ydb/core/tx/conveyor/usage/service.h
+++ b/ydb/core/tx/conveyor/usage/service.h
@@ -1,17 +1,61 @@
#pragma once
#include "config.h"
#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/core/tx/conveyor/service/service.h>
+#include <ydb/core/tx/conveyor/usage/events.h>
namespace NKikimr::NConveyor {
-class TServiceOperator {
+template <class TConveyorPolicy>
+class TServiceOperatorImpl {
private:
+ using TSelf = TServiceOperatorImpl<TConveyorPolicy>;
std::atomic<bool> IsEnabledFlag = false;
+ static void Register(const TConfig& serviceConfig) {
+ Singleton<TSelf>()->IsEnabledFlag = serviceConfig.IsEnabled();
+ }
+ static const TString& GetConveyorName() {
+ Y_VERIFY(TConveyorPolicy::Name.size() == 4);
+ return TConveyorPolicy::Name;
+ }
public:
- static bool IsEnabled();
- static void Register(const TConfig& serviceConfig);
+ static bool SendTaskToExecute(const std::shared_ptr<ITask>& task) {
+ auto& context = NActors::TActorContext::AsActorContext();
+ const NActors::TActorId& selfId = context.SelfID;
+ if (TSelf::IsEnabled()) {
+ context.Send(MakeServiceId(selfId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task));
+ return true;
+ } else {
+ task->Execute();
+ context.Send(selfId, new NConveyor::TEvExecution::TEvTaskProcessedResult(task));
+ return false;
+ }
+ }
+ static bool IsEnabled() {
+ return Singleton<TSelf>()->IsEnabledFlag;
+ }
+ static NActors::TActorId MakeServiceId(const ui32 nodeId) {
+ return NActors::TActorId(nodeId, "SrvcConv" + GetConveyorName());
+ }
+ static NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) {
+ Register(config);
+ return new TDistributor(config, GetConveyorName(), conveyorSignals);
+ }
+
+};
+
+class TScanConveyorPolicy {
+public:
+ static const inline TString Name = "Scan";
+};
+
+class TCompConveyorPolicy {
+public:
+ static const inline TString Name = "Comp";
};
-NActors::TActorId MakeServiceId(const ui32 nodeId);
+using TScanServiceOperator = TServiceOperatorImpl<TScanConveyorPolicy>;
+using TCompServiceOperator = TServiceOperatorImpl<TCompConveyorPolicy>;
}