aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-04 19:21:15 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-04 19:21:15 +0300
commit3f75ec8cd3385bb1b3218186ed863714070c3b47 (patch)
tree7e3690351d64b4bc30770039f7def5d984b81d0c
parenteacce0097cb55d01d55c0da863f59edd2d9f0d1f (diff)
downloadydb-3f75ec8cd3385bb1b3218186ed863714070c3b47.tar.gz
parallel cs-scan through conveyor service
-rw-r--r--ydb/core/base/events.h1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/config.h3
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp26
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h6
-rw-r--r--ydb/core/driver_lib/run/run.cpp6
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp1
-rw-r--r--ydb/core/protos/config.proto7
-rw-r--r--ydb/core/protos/services.proto1
-rw-r--r--ydb/core/testlib/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/testlib/test_client.cpp15
-rw-r--r--ydb/core/tx/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h31
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp97
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h5
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp88
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h91
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp2
-rw-r--r--ydb/core/tx/conveyor/CMakeLists.txt10
-rw-r--r--ydb/core/tx/conveyor/service/CMakeLists.darwin-x86_64.txt20
-rw-r--r--ydb/core/tx/conveyor/service/CMakeLists.linux-aarch64.txt21
-rw-r--r--ydb/core/tx/conveyor/service/CMakeLists.linux-x86_64.txt21
-rw-r--r--ydb/core/tx/conveyor/service/CMakeLists.txt17
-rw-r--r--ydb/core/tx/conveyor/service/CMakeLists.windows-x86_64.txt20
-rw-r--r--ydb/core/tx/conveyor/service/service.cpp66
-rw-r--r--ydb/core/tx/conveyor/service/service.h44
-rw-r--r--ydb/core/tx/conveyor/service/worker.cpp14
-rw-r--r--ydb/core/tx/conveyor/service/worker.h90
-rw-r--r--ydb/core/tx/conveyor/usage/CMakeLists.darwin-x86_64.txt22
-rw-r--r--ydb/core/tx/conveyor/usage/CMakeLists.linux-aarch64.txt23
-rw-r--r--ydb/core/tx/conveyor/usage/CMakeLists.linux-x86_64.txt23
-rw-r--r--ydb/core/tx/conveyor/usage/CMakeLists.txt17
-rw-r--r--ydb/core/tx/conveyor/usage/CMakeLists.windows-x86_64.txt22
-rw-r--r--ydb/core/tx/conveyor/usage/abstract.cpp5
-rw-r--r--ydb/core/tx/conveyor/usage/abstract.h17
-rw-r--r--ydb/core/tx/conveyor/usage/config.cpp19
-rw-r--r--ydb/core/tx/conveyor/usage/config.h16
-rw-r--r--ydb/core/tx/conveyor/usage/events.cpp5
-rw-r--r--ydb/core/tx/conveyor/usage/events.h39
-rw-r--r--ydb/core/tx/conveyor/usage/service.cpp17
-rw-r--r--ydb/core/tx/conveyor/usage/service.h17
-rw-r--r--ydb/services/metadata/request/common.h5
55 files changed, 904 insertions, 62 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index be982117f31..6b57e420d23 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -160,6 +160,7 @@ struct TKikimrEvents : TEvents {
ES_GRPC_CANCELATION,
ES_DISCOVERY,
ES_EXT_INDEX,
+ ES_CONVEYOR,
};
};
diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
index 211c479be13..cce9e766c78 100644
--- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
@@ -92,6 +92,7 @@ target_link_libraries(run PUBLIC
ydb-core-tx
core-tx-columnshard
core-tx-coordinator
+ tx-conveyor-service
core-tx-datashard
core-tx-long_tx_service
tx-long_tx_service-public
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
index 4bfff4150d8..6c34ef2660f 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
@@ -93,6 +93,7 @@ target_link_libraries(run PUBLIC
ydb-core-tx
core-tx-columnshard
core-tx-coordinator
+ tx-conveyor-service
core-tx-datashard
core-tx-long_tx_service
tx-long_tx_service-public
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
index 4bfff4150d8..6c34ef2660f 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
@@ -93,6 +93,7 @@ target_link_libraries(run PUBLIC
ydb-core-tx
core-tx-columnshard
core-tx-coordinator
+ tx-conveyor-service
core-tx-datashard
core-tx-long_tx_service
tx-long_tx_service-public
diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
index 211c479be13..cce9e766c78 100644
--- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
@@ -92,6 +92,7 @@ target_link_libraries(run PUBLIC
ydb-core-tx
core-tx-columnshard
core-tx-coordinator
+ tx-conveyor-service
core-tx-datashard
core-tx-long_tx_service
tx-long_tx_service-public
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h
index b584d31ea62..905ac5f22db 100644
--- a/ydb/core/driver_lib/run/config.h
+++ b/ydb/core/driver_lib/run/config.h
@@ -69,7 +69,8 @@ union TBasicKikimrServicesMask {
bool EnableMetadataProvider:1;
bool EnableReplicationService:1;
bool EnableBackgroundTasks:1;
- bool ExternalIndex:1;
+ bool EnableExternalIndex: 1;
+ bool EnableConveyor: 1;
};
ui64 Raw;
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index c7b9d5126e8..9a2200fff7a 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -157,6 +157,10 @@
#include <ydb/services/metadata/ds_table/service.h>
#include <ydb/services/metadata/service.h>
+#include <ydb/core/tx/conveyor/usage/config.h>
+#include <ydb/core/tx/conveyor/service/service.h>
+#include <ydb/core/tx/conveyor/usage/service.h>
+
#include <ydb/services/bg_tasks/ds_table/executor.h>
#include <ydb/services/bg_tasks/service.h>
#include <ydb/services/ext_index/common/config.h>
@@ -2344,6 +2348,28 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
}
}
+TConveyorInitializer::TConveyorInitializer(const TKikimrRunConfig& runConfig)
+ : IKikimrServicesInitializer(runConfig) {
+}
+
+void TConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+ NConveyor::TConfig serviceConfig;
+ if (Config.HasConveyorConfig()) {
+ Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetConveyorConfig()));
+ }
+
+ if (serviceConfig.IsEnabled()) {
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_CONVEYOR");
+
+ auto service = NConveyor::CreateService(serviceConfig, conveyorGroup);
+
+ setup->LocalServices.push_back(std::make_pair(
+ NConveyor::MakeServiceId(NodeId),
+ TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
+ }
+}
+
TExternalIndexInitializer::TExternalIndexInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index 74c76ab1ecd..832b3861238 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -383,6 +383,12 @@ private:
std::shared_ptr<TModuleFactories> Factories;
};
+class TConveyorInitializer: public IKikimrServicesInitializer {
+public:
+ TConveyorInitializer(const TKikimrRunConfig& runConfig);
+ void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
class TExternalIndexInitializer: public IKikimrServicesInitializer {
public:
TExternalIndexInitializer(const TKikimrRunConfig& runConfig);
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 0c28f502738..bfc3a8b623c 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1487,10 +1487,14 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TMetadataProviderInitializer(runConfig));
}
- if (serviceMask.ExternalIndex) {
+ if (serviceMask.EnableExternalIndex) {
sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig));
}
+ if (serviceMask.EnableConveyor) {
+ sil->AddServiceInitializer(new TConveyorInitializer(runConfig));
+ }
+
if (serviceMask.EnableBackgroundTasks) {
sil->AddServiceInitializer(new TBackgroundTasksInitializer(runConfig));
}
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 62041b9d870..36a276e73ad 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -50,6 +50,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
//runtime->SetLogPriority(NKikimrServices::LONG_TX_SERVICE, NActors::NLog::PRI_DEBUG);
runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_TRACE);
runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG);
+ runtime->SetLogPriority(NKikimrServices::TX_CONVEYOR, NActors::NLog::PRI_DEBUG);
//runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG);
//runtime->SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG);
//runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 517864cb64a..b889ef1cfda 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -602,6 +602,12 @@ message TInternalRequestConfig {
optional uint32 RetryPeriodFinishSeconds = 2 [default = 30];
}
+message TConveyorConfig {
+ optional bool Enabled = 1 [default = true];
+ optional uint32 WorkersCount = 2;
+ optional uint32 QueueSizeLimit = 3;
+}
+
message TExternalIndexConfig {
optional bool Enabled = 1 [default = true];
optional TInternalRequestConfig RequestConfig = 2;
@@ -1799,6 +1805,7 @@ message TAppConfig {
optional TClientCertificateAuthorization ClientCertificateAuthorization = 62;
optional TExternalIndexConfig ExternalIndexConfig = 63;
optional bool YamlConfigEnabled = 64;
+ optional TConveyorConfig ConveyorConfig = 65;
repeated TNamedConfig NamedConfigs = 100;
optional string ClusterYamlConfig = 101;
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index d1c1c54fedd..feac1edc5b2 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -362,6 +362,7 @@ enum EServiceKikimr {
DISCOVERY_CACHE = 1801;
EXT_INDEX = 1900;
+ TX_CONVEYOR = 2000;
};
message TActivity {
diff --git a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
index 55e14f4e7c1..b5ca30b917c 100644
--- a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
@@ -86,6 +86,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-datastreams
ydb-services-discovery
services-ext_index-service
+ tx-conveyor-service
ydb-services-fq
ydb-services-kesus
ydb-services-persqueue_cluster_discovery
diff --git a/ydb/core/testlib/CMakeLists.linux-aarch64.txt b/ydb/core/testlib/CMakeLists.linux-aarch64.txt
index 90b7724e073..87c8de49023 100644
--- a/ydb/core/testlib/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/testlib/CMakeLists.linux-aarch64.txt
@@ -87,6 +87,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-datastreams
ydb-services-discovery
services-ext_index-service
+ tx-conveyor-service
ydb-services-fq
ydb-services-kesus
ydb-services-persqueue_cluster_discovery
diff --git a/ydb/core/testlib/CMakeLists.linux-x86_64.txt b/ydb/core/testlib/CMakeLists.linux-x86_64.txt
index 90b7724e073..87c8de49023 100644
--- a/ydb/core/testlib/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.linux-x86_64.txt
@@ -87,6 +87,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-datastreams
ydb-services-discovery
services-ext_index-service
+ tx-conveyor-service
ydb-services-fq
ydb-services-kesus
ydb-services-persqueue_cluster_discovery
diff --git a/ydb/core/testlib/CMakeLists.windows-x86_64.txt b/ydb/core/testlib/CMakeLists.windows-x86_64.txt
index 55e14f4e7c1..b5ca30b917c 100644
--- a/ydb/core/testlib/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.windows-x86_64.txt
@@ -86,6 +86,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-datastreams
ydb-services-discovery
services-ext_index-service
+ tx-conveyor-service
ydb-services-fq
ydb-services-kesus
ydb-services-persqueue_cluster_discovery
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 162b35650b8..4c9678c8af8 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -95,6 +95,8 @@
#include <ydb/services/ext_index/common/config.h>
#include <ydb/services/ext_index/common/service.h>
#include <ydb/services/ext_index/service/executor.h>
+#include <ydb/core/tx/conveyor/service/service.h>
+#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/library/folder_service/mock/mock_folder_service.h>
#include <ydb/core/client/server/msgbus_server_tracer.h>
@@ -712,22 +714,27 @@ namespace Tests {
if (Settings->EnableConfigsDispatcher) {
auto *dispatcher = NConsole::CreateConfigsDispatcher(Settings->AppConfig, {});
auto aid = Runtime->Register(dispatcher, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
- Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid);
+ Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
if (Settings->IsEnableMetadataProvider()) {
auto* actor = NMetadata::NProvider::CreateService(NMetadata::NProvider::TConfig());
const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
- Runtime->RegisterService(NMetadata::NProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid);
+ Runtime->RegisterService(NMetadata::NProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
if (Settings->IsEnableBackgroundTasks()) {
auto* actor = NBackgroundTasks::CreateService(NBackgroundTasks::TConfig());
const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
- Runtime->RegisterService(NBackgroundTasks::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid);
+ Runtime->RegisterService(NBackgroundTasks::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
if (Settings->IsEnableExternalIndex()) {
auto* actor = NCSIndex::CreateService(NCSIndex::TConfig());
const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
- Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid);
+ Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
+ }
+ {
+ auto* actor = NConveyor::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters());
+ const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
+ Runtime->RegisterService(NConveyor::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
diff --git a/ydb/core/tx/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/CMakeLists.darwin-x86_64.txt
index e4fe0eea1fc..3589716aac5 100644
--- a/ydb/core/tx/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/CMakeLists.darwin-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(balance_coverage)
add_subdirectory(columnshard)
+add_subdirectory(conveyor)
add_subdirectory(coordinator)
add_subdirectory(datashard)
add_subdirectory(long_tx_service)
diff --git a/ydb/core/tx/CMakeLists.linux-aarch64.txt b/ydb/core/tx/CMakeLists.linux-aarch64.txt
index f2f8b8e3e5a..d0bf8cc5773 100644
--- a/ydb/core/tx/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/CMakeLists.linux-aarch64.txt
@@ -8,6 +8,7 @@
add_subdirectory(balance_coverage)
add_subdirectory(columnshard)
+add_subdirectory(conveyor)
add_subdirectory(coordinator)
add_subdirectory(datashard)
add_subdirectory(long_tx_service)
diff --git a/ydb/core/tx/CMakeLists.linux-x86_64.txt b/ydb/core/tx/CMakeLists.linux-x86_64.txt
index f2f8b8e3e5a..d0bf8cc5773 100644
--- a/ydb/core/tx/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/CMakeLists.linux-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(balance_coverage)
add_subdirectory(columnshard)
+add_subdirectory(conveyor)
add_subdirectory(coordinator)
add_subdirectory(datashard)
add_subdirectory(long_tx_service)
diff --git a/ydb/core/tx/CMakeLists.windows-x86_64.txt b/ydb/core/tx/CMakeLists.windows-x86_64.txt
index e4fe0eea1fc..3589716aac5 100644
--- a/ydb/core/tx/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/CMakeLists.windows-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(balance_coverage)
add_subdirectory(columnshard)
+add_subdirectory(conveyor)
add_subdirectory(coordinator)
add_subdirectory(datashard)
add_subdirectory(long_tx_service)
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index 5c35ada84d1..a81d192131f 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -36,6 +36,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
ydb-core-tablet_flat
tx-columnshard-engines
core-tx-tiering
+ tx-conveyor-usage
tx-long_tx_service-public
ydb-core-util
api-protos
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 19ea4034a03..77e8b4de116 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -37,6 +37,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
ydb-core-tablet_flat
tx-columnshard-engines
core-tx-tiering
+ tx-conveyor-usage
tx-long_tx_service-public
ydb-core-util
api-protos
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 19ea4034a03..77e8b4de116 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -37,6 +37,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
ydb-core-tablet_flat
tx-columnshard-engines
core-tx-tiering
+ tx-conveyor-usage
tx-long_tx_service-public
ydb-core-util
api-protos
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index 5c35ada84d1..a81d192131f 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -36,6 +36,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
ydb-core-tablet_flat
tx-columnshard-engines
core-tx-tiering
+ tx-conveyor-usage
tx-long_tx_service-public
ydb-core-util
api-protos
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index d9d29661d2d..4e92a1ebaea 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -30,9 +30,6 @@ using NOlap::TBlobRange;
class TColumnShardScanIterator : public TScanIteratorBase {
NOlap::TReadMetadata::TConstPtr ReadMetadata;
NOlap::TIndexedReadData IndexedData;
- THashMap<TBlobRange, ui64> IndexedBlobs; // blobId -> granule
- THashSet<TBlobRange> WaitIndexed;
- THashMap<ui64, THashSet<TBlobRange>> GranuleBlobs; // granule -> blobs
std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted;
TVector<TBlobRange> BlobsToRead;
ui64 NextBlobIdxToRead = 0;
@@ -51,10 +48,10 @@ public:
const auto& cmtBlob = ReadMetadata->CommittedBlobs[i];
WaitCommitted.emplace(cmtBlob, batchNo);
}
- IndexedBlobs = IndexedData.InitRead(batchNo, true);
- for (auto& [blobId, granule] : IndexedBlobs) {
- WaitIndexed.insert(blobId);
- GranuleBlobs[granule].insert(blobId);
+ auto indexedBlobs = IndexedData.InitRead(batchNo, true);
+ THashMap<ui64, THashSet<TBlobRange>> granuleBlobs; // granule -> blobs
+ for (auto& [blobId, granule] : indexedBlobs) {
+ granuleBlobs[granule].insert(blobId);
}
// Add cached batches without read
@@ -78,21 +75,21 @@ public:
// Read all indexed blobs (in correct order)
auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
for (ui64 granule : granulesOrder) {
- auto& blobs = GranuleBlobs[granule];
+ auto& blobs = granuleBlobs[granule];
BlobsToRead.insert(BlobsToRead.end(), blobs.begin(), blobs.end());
}
IsReadFinished = ReadMetadata->Empty();
}
- void AddData(const TBlobRange& blobRange, TString data) override {
+ virtual void Apply(IDataPreparationTask::TPtr task) override {
+ task->Apply(IndexedData);
+ }
+
+ void AddData(const TBlobRange& blobRange, TString data, IDataTasksProcessor::TPtr processor) override {
const auto& blobId = blobRange.BlobId;
- if (IndexedBlobs.count(blobRange)) {
- if (!WaitIndexed.count(blobRange)) {
- return; // ignore duplicate parts
- }
- WaitIndexed.erase(blobRange);
- IndexedData.AddIndexed(blobRange, data);
+ if (IndexedData.IsIndexedBlob(blobRange)) {
+ IndexedData.AddIndexed(blobRange, data, processor);
} else {
auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0});
if (cmt.empty()) {
@@ -157,11 +154,11 @@ private:
if (limitLeft == 0) {
WaitCommitted.clear();
- WaitIndexed.clear();
+ IndexedData.ForceFinishWaiting();
IsReadFinished = true;
}
- if (WaitCommitted.empty() && WaitIndexed.empty() && NextBlobIdxToRead == BlobsToRead.size()) {
+ if (WaitCommitted.empty() && !IndexedData.HasWaitIndexed() && NextBlobIdxToRead == BlobsToRead.size()) {
IsReadFinished = true;
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 1172dc2171d..da8da9ee25e 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -10,22 +10,25 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/actorlib_impl/long_timer.h>
+#include <ydb/core/tx/conveyor/usage/service.h>
+#include <ydb/core/tx/conveyor/usage/events.h>
#include <ydb/library/yql/core/issue/yql_issue.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/services/metadata/request/common.h>
namespace NKikimr::NColumnShard {
using namespace NKqp;
using NBlobCache::TBlobRange;
-class TTxScan : public TTxReadBase {
+class TTxScan: public TTxReadBase {
public:
using TReadMetadataPtr = NOlap::TReadMetadataBase::TConstPtr;
TTxScan(TColumnShard* self, TEvColumnShard::TEvScan::TPtr& ev)
: TTxReadBase(self)
- , Ev(ev)
- {}
+ , Ev(ev) {
+ }
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;
@@ -42,10 +45,25 @@ private:
constexpr ui64 INIT_BATCH_ROWS = 1000;
-constexpr i64 DEFAULT_READ_AHEAD_BYTES = 100*1024*1024;
+constexpr i64 DEFAULT_READ_AHEAD_BYTES = 100 * 1024 * 1024;
constexpr TDuration SCAN_HARD_TIMEOUT = TDuration::Minutes(10);
constexpr TDuration SCAN_HARD_TIMEOUT_GAP = TDuration::Seconds(5);
+class TLocalDataTasksProcessor: public IDataTasksProcessor {
+private:
+ const TActorIdentity OwnerActorId;
+protected:
+ virtual bool DoAdd(IDataPreparationTask::TPtr task) override {
+ OwnerActorId.Send(NConveyor::MakeServiceId(OwnerActorId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task));
+ return true;
+ }
+public:
+ TLocalDataTasksProcessor(const TActorIdentity& ownerActorId)
+ : OwnerActorId(ownerActorId)
+ {
+ }
+};
+
class TColumnShardScan : public TActorBootstrapped<TColumnShardScan>, NArrow::IRowWriter {
public:
static constexpr auto ActorActivityType() {
@@ -84,6 +102,9 @@ public:
// propagate self actor id // TODO: FlagSubscribeOnSession ?
Send(ScanComputeActorId, new TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen), IEventHandle::FlagTrackDelivery);
+ if (NConveyor::TServiceOperator::IsEnabled()) {
+ DataTasksProcessor = std::make_shared<TLocalDataTasksProcessor>(SelfId());
+ }
Become(&TColumnShardScan::StateScan);
}
@@ -96,6 +117,7 @@ private:
hFunc(TEvKqp::TEvAbortExecution, HandleScan);
hFunc(TEvents::TEvUndelivered, HandleScan);
hFunc(TEvents::TEvWakeup, HandleScan);
+ hFunc(NConveyor::TEvExecution::TEvTaskProcessedResult, HandleScan);
default:
Y_FAIL("TColumnShardScan: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite());
}
@@ -135,7 +157,23 @@ private:
return true;
}
+ void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
+ ALS_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN) << SelfId();
+ Stats.TaskProcessed();
+ if (ev->Get()->GetErrorMessage()) {
+ DataTasksProcessor->Stop();
+ SendScanError(ev->Get()->GetErrorMessage());
+ Finish();
+ } else {
+ auto t = dynamic_pointer_cast<IDataPreparationTask>(ev->Get()->GetResult());
+ Y_VERIFY(t);
+ ScanIterator->Apply(t);
+ }
+ ContinueProcessing();
+ }
+
void HandleScan(TEvKqpCompute::TEvScanDataAck::TPtr& ev) {
+ Stats.TaskProcessed();
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " got ScanDataAck"
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
@@ -156,6 +194,7 @@ private:
}
void HandleScan(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) {
+ auto g = Stats.MakeGuard("EvResult");
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " blobs response:"
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
@@ -188,7 +227,10 @@ private:
<< " prevFreeSpace: " << PeerFreeSpace);
if (ScanIterator) {
- ScanIterator->AddData(blobRange, event.Data);
+ {
+ auto g = Stats.MakeGuard("AddData");
+ ScanIterator->AddData(blobRange, event.Data, DataTasksProcessor);
+ }
ContinueProcessing();
}
}
@@ -324,6 +366,7 @@ private:
// * or there is an in-flight blob read or ScanData message for which
// we will get a reply and will be able to proceed futher
if (!ScanIterator || InFlightScanDataMessages != 0 || InFlightReads != 0) {
+ Stats.StartWait();
return;
}
}
@@ -332,6 +375,7 @@ private:
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " is hanging"
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ Stats.StartWait();
}
void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) {
@@ -391,6 +435,7 @@ private:
}
void NextReadMetadata() {
+ auto g = Stats.MakeGuard("NextReadMetadata");
ScanIterator.reset();
++ReadMetadataIndex;
@@ -561,6 +606,8 @@ private:
i64 InFlightScanDataMessages = 0;
bool Finished = false;
+ IDataTasksProcessor::TPtr DataTasksProcessor;
+
class TBlobStats {
private:
ui64 PartsCount = 0;
@@ -598,11 +645,25 @@ private:
TBlobStats CacheBlobs;
TBlobStats MissBlobs;
THashMap<TString, TDuration> GuardedDurations;
+ THashMap<TString, TInstant> StartGuards;
+ std::optional<TInstant> FirstReceived;
+ TInstant LastWaitStart;
+ TDuration WaitReceive;
public:
+ void StartWait() {
+ Y_VERIFY_DEBUG(!LastWaitStart);
+ LastWaitStart = Now();
+ }
+
TString DebugString() const {
TStringBuilder sb;
sb << "SCAN_STATS;";
+ sb << "start=" << StartInstant << ";";
+ if (FirstReceived) {
+ sb << "frw=" << *FirstReceived - StartInstant << ";";
+ }
+ sb << "srw=" << WaitReceive << ";";
sb << "d=" << FinishInstant - StartInstant << ";";
if (RequestsCount) {
sb << "req:{count=" << RequestsCount << ";bytes=" << RequestedBytes << ";bytes_avg=" << RequestedBytes / RequestsCount << "};";
@@ -612,7 +673,12 @@ private:
sb << "NO_REQUESTS;";
}
for (auto&& i : GuardedDurations) {
- sb << i.first << "=" << i.second << ";";
+ auto it = StartGuards.find(i.first);
+ TDuration delta;
+ if (it != StartGuards.end()) {
+ delta = Now() - it->second;
+ }
+ sb << i.first << "=" << i.second + delta << ";";
}
return sb;
}
@@ -621,17 +687,18 @@ private:
private:
TScanStats& Owner;
const TInstant Start = Now();
- TString SectionName;
+ const TString SectionName;
public:
TGuard(const TString& sectionName, TScanStats& owner)
: Owner(owner)
, SectionName(sectionName)
{
-
+ Y_VERIFY(Owner.StartGuards.emplace(SectionName, Start).second);
}
~TGuard() {
Owner.GuardedDurations[SectionName] += Now() - Start;
+ Owner.StartGuards.erase(SectionName);
}
};
@@ -648,7 +715,21 @@ private:
}
}
+ void TaskProcessed() {
+ if (LastWaitStart) {
+ WaitReceive += Now() - LastWaitStart;
+ LastWaitStart = TInstant::Zero();
+ }
+ }
+
void BlobReceived(const NBlobCache::TBlobRange& br, const bool fromCache, const TInstant replyInstant) {
+ if (!FirstReceived) {
+ FirstReceived = Now();
+ }
+ if (LastWaitStart) {
+ WaitReceive += Now() - LastWaitStart;
+ LastWaitStart = TInstant::Zero();
+ }
auto it = StartBlobRequest.find(br);
Y_VERIFY(it != StartBlobRequest.end());
const TDuration d = replyInstant - it->second;
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index 1447d494c18..f462aedc469 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -9,7 +9,10 @@ class TScanIteratorBase {
public:
virtual ~TScanIteratorBase() = default;
- virtual void AddData(const NBlobCache::TBlobRange& /*blobRange*/, TString /*data*/) {}
+ virtual void Apply(IDataPreparationTask::TPtr /*processor*/) {
+
+ }
+ virtual void AddData(const NBlobCache::TBlobRange& /*blobRange*/, TString /*data*/, IDataTasksProcessor::TPtr /*processor*/) {}
virtual bool Finished() const = 0;
virtual NOlap::TPartialReadResult GetBatch() = 0;
virtual NBlobCache::TBlobRange GetNextBlobToRead() { return NBlobCache::TBlobRange(); }
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 4466b546a73..050344eee48 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -114,6 +114,33 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
}
+bool TIndexedReadData::TAssembledNotFiltered::DoExecuteImpl() {
+ /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey.
+ /// It's not OK to apply predicate before replacing key duplicates otherwise.
+ /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
+ auto filtered = NOlap::FilterPortion(Batch, *ReadMetadata);
+ if (filtered.Batch) {
+ Y_VERIFY(filtered.Valid());
+ filtered.ApplyFilter();
+ }
+#if 1 // optimization
+ if (filtered.Batch && ReadMetadata->Program && AllowEarlyFilter) {
+ filtered = NOlap::EarlyFilter(filtered.Batch, ReadMetadata->Program);
+ }
+ if (filtered.Batch) {
+ Y_VERIFY(filtered.Valid());
+ filtered.ApplyFilter();
+ }
+#endif
+ FilteredBatch = filtered.Batch;
+ return true;
+}
+
+bool TIndexedReadData::TAssembledNotFiltered::DoApply(TIndexedReadData& owner) const {
+ owner.PortionFinished(BatchNo, FilteredBatch);
+ return true;
+}
+
std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan() const {
return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this());
}
@@ -218,27 +245,34 @@ THashMap<TBlobRange, ui64> TIndexedReadData::InitRead(ui32 inputBatch, bool inGr
return out;
}
-void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& column) {
+void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data, NColumnShard::IDataTasksProcessor::TPtr processor) {
Y_VERIFY(IndexedBlobs.count(blobRange));
ui32 batchNo = IndexedBlobs[blobRange];
if (!WaitIndexed.count(batchNo)) {
return;
}
auto& waitingFor = WaitIndexed[batchNo];
- waitingFor.erase(blobRange);
+ if (waitingFor.erase(blobRange) != 1) {
+ return;
+ }
- Data[blobRange] = column;
+ Data[blobRange] = data;
if (waitingFor.empty()) {
- WaitIndexed.erase(batchNo);
- if (auto batch = AssembleIndexedBatch(batchNo)) {
- Indexed[batchNo] = batch;
+ if (auto batch = AssembleIndexedBatch(batchNo, processor)) {
+ if (processor) {
+ processor->Add(batch);
+ } else {
+ batch->Execute();
+ batch->Apply(*this);
+ }
+ } else {
+ WaitIndexed.erase(batchNo);
}
- UpdateGranuleWaits(batchNo);
}
}
-std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32 batchNo) {
+NColumnShard::IDataPreparationTask::TPtr TIndexedReadData::AssembleIndexedBatch(ui32 batchNo, NColumnShard::IDataTasksProcessor::TPtr processor) {
auto& portionInfo = Portion(batchNo);
Y_VERIFY(portionInfo.Produced());
@@ -250,24 +284,7 @@ std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32
Data.erase(blobRange);
}
- /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey.
- /// It's not OK to apply predicate before replacing key duplicates otherwise.
- /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
- auto filtered = NOlap::FilterPortion(batch, *ReadMetadata);
- if (filtered.Batch) {
- Y_VERIFY(filtered.Valid());
- filtered.ApplyFilter();
- }
-#if 1 // optimization
- if (filtered.Batch && ReadMetadata->Program && portionInfo.AllowEarlyFilter()) {
- filtered = NOlap::EarlyFilter(filtered.Batch, ReadMetadata->Program);
- }
- if (filtered.Batch) {
- Y_VERIFY(filtered.Valid());
- filtered.ApplyFilter();
- }
-#endif
- return filtered.Batch;
+ return std::make_shared<TAssembledNotFiltered>(batch, ReadMetadata, batchNo, portionInfo.AllowEarlyFilter(), processor);
}
void TIndexedReadData::UpdateGranuleWaits(ui32 batchNo) {
@@ -590,4 +607,23 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco
return out;
}
+void TIndexedReadData::PortionFinished(const ui32 batchNo, std::shared_ptr<arrow::RecordBatch> batch) {
+ WaitIndexed.erase(batchNo);
+ if (batch) {
+ Y_VERIFY(Indexed.emplace(batchNo, batch).second);
+ }
+ UpdateGranuleWaits(batchNo);
+}
+
+}
+
+namespace NKikimr::NColumnShard {
+
+bool IDataPreparationTask::DoExecute() {
+ if (OwnerOperator && OwnerOperator->IsStopped()) {
+ return true;
+ } else {
+ return DoExecuteImpl();
+ }
+}
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index bc6764dd0b3..b815aa216b7 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -3,12 +3,62 @@
#include "column_engine.h"
#include "column_engine_logs.h" // for TColumnEngineForLogs::TMark
#include "predicate.h"
+#include <ydb/core/tx/conveyor/usage/abstract.h>
namespace NKikimr::NColumnShard {
class TScanIteratorBase;
}
namespace NKikimr::NOlap {
+class TIndexedReadData;
+}
+
+namespace NKikimr::NColumnShard {
+
+class IDataTasksProcessor;
+
+class IDataPreparationTask: public NConveyor::ITask {
+private:
+ std::shared_ptr<IDataTasksProcessor> OwnerOperator;
+protected:
+ virtual bool DoApply(NOlap::TIndexedReadData& indexedDataRead) const = 0;
+ virtual bool DoExecuteImpl() = 0;
+
+ virtual bool DoExecute() override final;
+public:
+ IDataPreparationTask(std::shared_ptr<IDataTasksProcessor> ownerOperator)
+ : OwnerOperator(ownerOperator)
+ {
+
+ }
+ using TPtr = std::shared_ptr<IDataPreparationTask>;
+ virtual ~IDataPreparationTask() = default;
+ bool Apply(NOlap::TIndexedReadData& indexedDataRead) const {
+ return DoApply(indexedDataRead);
+ }
+};
+
+class IDataTasksProcessor {
+protected:
+ virtual bool DoAdd(IDataPreparationTask::TPtr task) = 0;
+ std::atomic<bool> Stopped = false;
+public:
+ void Stop() {
+ Stopped = true;
+ }
+ bool IsStopped() const {
+ return Stopped;
+ }
+
+ using TPtr = std::shared_ptr<IDataTasksProcessor>;
+ virtual ~IDataTasksProcessor() = default;
+ bool Add(IDataPreparationTask::TPtr task) {
+ return DoAdd(task);
+ }
+};
+}
+
+namespace NKikimr::NOlap {
struct TReadStats {
TInstant BeginTimestamp;
@@ -207,16 +257,50 @@ public:
NotIndexed[batchNo] = MakeNotIndexedBatch(batch, planStep, txId);
}
- void AddIndexed(const TBlobRange& blobRange, const TString& column);
+ void AddIndexed(const TBlobRange& blobRange, const TString& column, NColumnShard::IDataTasksProcessor::TPtr processor);
size_t NumPortions() const { return PortionBatch.size(); }
bool HasIndexRead() const { return WaitIndexed.size() || Indexed.size(); }
-
+ bool IsIndexedBlob(const TBlobRange& blobRange) const {
+ return IndexedBlobs.contains(blobRange);
+ }
+ void ForceFinishWaiting() {
+ WaitIndexed.clear();
+ }
+ bool HasWaitIndexed() const {
+ return WaitIndexed.size();
+ }
private:
NOlap::TReadMetadata::TConstPtr ReadMetadata;
ui32 FirstIndexedBatch{0};
THashMap<TBlobRange, TString> Data;
std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
THashMap<ui32, std::shared_ptr<arrow::RecordBatch>> Indexed;
+
+ class TAssembledNotFiltered: public NColumnShard::IDataPreparationTask {
+ private:
+ using TBase = NColumnShard::IDataPreparationTask;
+ std::shared_ptr<arrow::RecordBatch> Batch;
+ std::shared_ptr<arrow::RecordBatch> FilteredBatch;
+ NOlap::TReadMetadata::TConstPtr ReadMetadata;
+ ui32 BatchNo = 0;
+ bool AllowEarlyFilter = false;
+ protected:
+ virtual bool DoApply(TIndexedReadData& owner) const override;
+ virtual bool DoExecuteImpl() override;
+ public:
+ TAssembledNotFiltered(std::shared_ptr<arrow::RecordBatch> batch, NOlap::TReadMetadata::TConstPtr readMetadata,
+ const ui32 batchNo, const bool allowEarlyFilter, NColumnShard::IDataTasksProcessor::TPtr processor)
+ : TBase(processor)
+ , Batch(batch)
+ , ReadMetadata(readMetadata)
+ , BatchNo(batchNo)
+ , AllowEarlyFilter(allowEarlyFilter)
+ {
+
+ }
+ };
+ void PortionFinished(const ui32 batchNo, std::shared_ptr<arrow::RecordBatch> batch);
+
THashMap<ui32, THashSet<TBlobRange>> WaitIndexed;
THashMap<TBlobRange, ui32> IndexedBlobs; // blobId -> batchNo
ui32 ReadyNotIndexed{0};
@@ -251,7 +335,8 @@ private:
std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(
const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const;
- std::shared_ptr<arrow::RecordBatch> AssembleIndexedBatch(ui32 batchNo);
+
+ NColumnShard::IDataPreparationTask::TPtr AssembleIndexedBatch(ui32 batchNo, NColumnShard::IDataTasksProcessor::TPtr processor);
void UpdateGranuleWaits(ui32 batchNo);
std::shared_ptr<arrow::RecordBatch> MergeNotIndexed(
std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const;
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 11b053aea16..7a0d8095d07 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -51,7 +51,7 @@ public:
return; // ignore duplicate parts
}
WaitIndexed.erase(event.BlobRange);
- IndexedData.AddIndexed(event.BlobRange, event.Data);
+ IndexedData.AddIndexed(event.BlobRange, event.Data, nullptr);
} else if (CommittedBlobs.count(blobId)) {
auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0});
if (cmt.empty()) {
diff --git a/ydb/core/tx/conveyor/CMakeLists.txt b/ydb/core/tx/conveyor/CMakeLists.txt
new file mode 100644
index 00000000000..57251a735fa
--- /dev/null
+++ b/ydb/core/tx/conveyor/CMakeLists.txt
@@ -0,0 +1,10 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(service)
+add_subdirectory(usage)
diff --git a/ydb/core/tx/conveyor/service/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/conveyor/service/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..54a440d2794
--- /dev/null
+++ b/ydb/core/tx/conveyor/service/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-conveyor-service)
+target_link_libraries(tx-conveyor-service PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ tx-conveyor-usage
+ ydb-core-protos
+)
+target_sources(tx-conveyor-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/worker.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/service.cpp
+)
diff --git a/ydb/core/tx/conveyor/service/CMakeLists.linux-aarch64.txt b/ydb/core/tx/conveyor/service/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..b862166c4cf
--- /dev/null
+++ b/ydb/core/tx/conveyor/service/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-conveyor-service)
+target_link_libraries(tx-conveyor-service PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ tx-conveyor-usage
+ ydb-core-protos
+)
+target_sources(tx-conveyor-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/worker.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/service.cpp
+)
diff --git a/ydb/core/tx/conveyor/service/CMakeLists.linux-x86_64.txt b/ydb/core/tx/conveyor/service/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..b862166c4cf
--- /dev/null
+++ b/ydb/core/tx/conveyor/service/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-conveyor-service)
+target_link_libraries(tx-conveyor-service PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ tx-conveyor-usage
+ ydb-core-protos
+)
+target_sources(tx-conveyor-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/worker.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/service.cpp
+)
diff --git a/ydb/core/tx/conveyor/service/CMakeLists.txt b/ydb/core/tx/conveyor/service/CMakeLists.txt
new file mode 100644
index 00000000000..a692f90f36e
--- /dev/null
+++ b/ydb/core/tx/conveyor/service/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64")
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/conveyor/service/CMakeLists.windows-x86_64.txt b/ydb/core/tx/conveyor/service/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..54a440d2794
--- /dev/null
+++ b/ydb/core/tx/conveyor/service/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-conveyor-service)
+target_link_libraries(tx-conveyor-service PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ tx-conveyor-usage
+ ydb-core-protos
+)
+target_sources(tx-conveyor-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/worker.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/service.cpp
+)
diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp
new file mode 100644
index 00000000000..710f372276b
--- /dev/null
+++ b/ydb/core/tx/conveyor/service/service.cpp
@@ -0,0 +1,66 @@
+#include "service.h"
+#include <ydb/core/tx/conveyor/usage/service.h>
+#include <ydb/core/kqp/query_data/kqp_predictor.h>
+
+namespace NKikimr::NConveyor {
+
+NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) {
+ return new TDistributor(config, "common", conveyorSignals);
+}
+
+TDistributor::TDistributor(const TConfig& config, const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals)
+ : Config(config)
+ , ConveyorName(conveyorName)
+ , WaitingQueueSize(conveyorSignals->GetCounter("WaitingQueueSize"))
+ , WorkersCount(conveyorSignals->GetCounter("WorkersCount"))
+ , WorkersCountLimit(conveyorSignals->GetCounter("WorkersCountLimit"))
+ , IncomingRate(conveyorSignals->GetCounter("Incoming", true))
+ , SolutionsRate(conveyorSignals->GetCounter("Solved", true)) {
+
+}
+
+void TDistributor::Bootstrap() {
+ ALS_NOTICE(NKikimrServices::TX_CONVEYOR) << "conveyor registered: " << SelfId();
+ TServiceOperator::Register(Config);
+ for (ui32 i = 0; i < Config.GetWorkersCountDef(NKqp::TStagePredictor::GetUsableThreads()); ++i) {
+ Workers.emplace_back(Register(new TWorker()));
+ }
+ WorkersCountLimit->Set(Workers.size());
+ Become(&TDistributor::StateMain);
+}
+
+void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) {
+ ALS_DEBUG(NKikimrServices::TX_CONVEYOR) << "action=processed;owner=" << ev->Get()->GetOwnerId() << ";workers=" << Workers.size() << ";waiting=" << Waiting.size();
+ SolutionsRate->Inc();
+ if (Waiting.size()) {
+ Send(ev->Sender, new TEvInternal::TEvNewTask(Waiting.front()));
+ Waiting.pop_front();
+ } else {
+ Workers.emplace_back(ev->Sender);
+ }
+ if (!*ev->Get()) {
+ Send(ev->Get()->GetOwnerId(), new TEvExecution::TEvTaskProcessedResult(ev->Get()->GetErrorMessage()));
+ } else {
+ Send(ev->Get()->GetOwnerId(), new TEvExecution::TEvTaskProcessedResult(ev->Get()->GetResult()));
+ }
+ WaitingQueueSize->Set(Waiting.size());
+ WorkersCount->Set(Workers.size());
+}
+
+void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) {
+ ALS_DEBUG(NKikimrServices::TX_CONVEYOR) << "action=add_task;owner=" << ev->Sender << ";workers=" << Workers.size() << ";waiting=" << Waiting.size();
+ IncomingRate->Inc();
+ if (Workers.size()) {
+ Send(Workers.back(), new TEvInternal::TEvNewTask(TWorkerTask(ev->Get()->GetTask(), ev->Sender)));
+ Workers.pop_back();
+ } else if (Waiting.size() < Config.GetQueueSizeLimit()) {
+ Waiting.emplace_back(ev->Get()->GetTask(), ev->Sender);
+ } else {
+ Send(ev->Sender, new TEvExecution::TEvTaskProcessedResult("scan conveyor overloaded (" +
+ ::ToString(Waiting.size()) + " > " + ::ToString(Config.GetQueueSizeLimit()) + ")"));
+ }
+ WaitingQueueSize->Set(Waiting.size());
+ WorkersCount->Set(Workers.size());
+}
+
+}
diff --git a/ydb/core/tx/conveyor/service/service.h b/ydb/core/tx/conveyor/service/service.h
new file mode 100644
index 00000000000..149cab231af
--- /dev/null
+++ b/ydb/core/tx/conveyor/service/service.h
@@ -0,0 +1,44 @@
+#pragma once
+#include "worker.h"
+#include <ydb/core/tx/conveyor/usage/config.h>
+#include <ydb/core/tx/conveyor/usage/events.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NKikimr::NConveyor {
+
+class TDistributor: public TActorBootstrapped<TDistributor> {
+private:
+ const TConfig Config;
+ const TString ConveyorName = "common";
+ std::vector<TActorId> Workers;
+ std::deque<TWorkerTask> Waiting;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSize;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCount;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCountLimit;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRate;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr SolutionsRate;
+
+ void HandleMain(TEvExecution::TEvNewTask::TPtr& ev);
+ void HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev);
+
+public:
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExecution::TEvNewTask, HandleMain);
+ hFunc(TEvInternal::TEvTaskProcessedResult, HandleMain);
+ default:
+ ALS_ERROR(NKikimrServices::TX_CONVEYOR) << ConveyorName << ": unexpected event for task executor: " << ev->GetTypeRewrite();
+ break;
+ }
+ }
+
+ TDistributor(const TConfig& config, const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals);
+
+ void Bootstrap();
+};
+
+NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals);
+
+}
diff --git a/ydb/core/tx/conveyor/service/worker.cpp b/ydb/core/tx/conveyor/service/worker.cpp
new file mode 100644
index 00000000000..caba610a3ac
--- /dev/null
+++ b/ydb/core/tx/conveyor/service/worker.cpp
@@ -0,0 +1,14 @@
+#include "worker.h"
+
+namespace NKikimr::NConveyor {
+
+void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) {
+ auto& workerTask = ev->Get()->GetTask();
+ if (workerTask.GetTask()->Execute()) {
+ TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask.GetOwnerId(), workerTask.GetTask()).SendTo(ev->Sender);
+ } else {
+ TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask.GetOwnerId(), "cannot execute task").SendTo(ev->Sender);
+ }
+}
+
+}
diff --git a/ydb/core/tx/conveyor/service/worker.h b/ydb/core/tx/conveyor/service/worker.h
new file mode 100644
index 00000000000..32b0148adda
--- /dev/null
+++ b/ydb/core/tx/conveyor/service/worker.h
@@ -0,0 +1,90 @@
+#pragma once
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/conveyor/usage/abstract.h>
+#include <ydb/core/protos/services.pb.h>
+#include <ydb/services/metadata/request/common.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+namespace NKikimr::NConveyor {
+
+class TWorkerTask {
+private:
+ YDB_READONLY_DEF(ITask::TPtr, Task);
+ YDB_READONLY_DEF(NActors::TActorId, OwnerId);
+public:
+ TWorkerTask(ITask::TPtr task, const NActors::TActorId& ownerId)
+ : Task(task)
+ , OwnerId(ownerId) {
+
+ }
+};
+
+struct TEvInternal {
+ enum EEv {
+ EvNewTask = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvTaskProcessedResult,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expected EvEnd < EventSpaceEnd");
+
+ class TEvNewTask: public NActors::TEventLocal<TEvNewTask, EvNewTask> {
+ private:
+ TWorkerTask Task;
+ public:
+ TEvNewTask() = default;
+
+ const TWorkerTask& GetTask() const {
+ return Task;
+ }
+
+ explicit TEvNewTask(const TWorkerTask& task)
+ : Task(task) {
+ }
+ };
+
+ class TEvTaskProcessedResult:
+ public NActors::TEventLocal<TEvTaskProcessedResult, EvTaskProcessedResult>,
+ public NMetadata::NRequest::TMaybeResult<ITask::TPtr> {
+ private:
+ using TBase = NMetadata::NRequest::TMaybeResult<ITask::TPtr>;
+ YDB_READONLY_DEF(NActors::TActorId, OwnerId);
+ public:
+ TEvTaskProcessedResult(const NActors::TActorId& ownerId, const TString& errorMessage)
+ : TBase(errorMessage)
+ , OwnerId(ownerId) {
+
+ }
+ TEvTaskProcessedResult(const NActors::TActorId& ownerId, ITask::TPtr result)
+ : TBase(result)
+ , OwnerId(ownerId) {
+
+ }
+ };
+};
+
+class TWorker: public NActors::TActorBootstrapped<TWorker> {
+private:
+ using TBase = NActors::TActorBootstrapped<TWorker>;
+public:
+ void HandleMain(TEvInternal::TEvNewTask::TPtr& ev);
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvInternal::TEvNewTask, HandleMain);
+ default:
+ ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "unexpected event for task executor: " << ev->GetTypeRewrite();
+ break;
+ }
+ }
+
+ void Bootstrap() {
+ Become(&TWorker::StateMain);
+ }
+};
+
+}
diff --git a/ydb/core/tx/conveyor/usage/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/conveyor/usage/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..f18e519b827
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-conveyor-usage)
+target_link_libraries(tx-conveyor-usage PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ services-metadata-request
+)
+target_sources(tx-conveyor-usage PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/service.cpp
+)
diff --git a/ydb/core/tx/conveyor/usage/CMakeLists.linux-aarch64.txt b/ydb/core/tx/conveyor/usage/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..abe9415a545
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-conveyor-usage)
+target_link_libraries(tx-conveyor-usage PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ services-metadata-request
+)
+target_sources(tx-conveyor-usage PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/service.cpp
+)
diff --git a/ydb/core/tx/conveyor/usage/CMakeLists.linux-x86_64.txt b/ydb/core/tx/conveyor/usage/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..abe9415a545
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-conveyor-usage)
+target_link_libraries(tx-conveyor-usage PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ services-metadata-request
+)
+target_sources(tx-conveyor-usage PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/service.cpp
+)
diff --git a/ydb/core/tx/conveyor/usage/CMakeLists.txt b/ydb/core/tx/conveyor/usage/CMakeLists.txt
new file mode 100644
index 00000000000..a692f90f36e
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64")
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/conveyor/usage/CMakeLists.windows-x86_64.txt b/ydb/core/tx/conveyor/usage/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..f18e519b827
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-conveyor-usage)
+target_link_libraries(tx-conveyor-usage PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ services-metadata-request
+)
+target_sources(tx-conveyor-usage PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/service.cpp
+)
diff --git a/ydb/core/tx/conveyor/usage/abstract.cpp b/ydb/core/tx/conveyor/usage/abstract.cpp
new file mode 100644
index 00000000000..42737a8e146
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/abstract.cpp
@@ -0,0 +1,5 @@
+#include "abstract.h"
+
+namespace NKikimr::NConveyor {
+
+}
diff --git a/ydb/core/tx/conveyor/usage/abstract.h b/ydb/core/tx/conveyor/usage/abstract.h
new file mode 100644
index 00000000000..59018093308
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/abstract.h
@@ -0,0 +1,17 @@
+#pragma once
+#include <memory>
+
+namespace NKikimr::NConveyor {
+
+class ITask {
+protected:
+ virtual bool DoExecute() = 0;
+public:
+ using TPtr = std::shared_ptr<ITask>;
+ virtual ~ITask() = default;
+ bool Execute() {
+ return DoExecute();
+ }
+};
+
+}
diff --git a/ydb/core/tx/conveyor/usage/config.cpp b/ydb/core/tx/conveyor/usage/config.cpp
new file mode 100644
index 00000000000..b73683cd2ac
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/config.cpp
@@ -0,0 +1,19 @@
+#include "config.h"
+
+namespace NKikimr::NConveyor {
+
+bool TConfig::DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config) {
+ if (!config.HasEnabled()) {
+ EnabledFlag = true;
+ } else {
+ EnabledFlag = config.GetEnabled();
+ }
+ if (config.HasQueueSizeLimit()) {
+ QueueSizeLimit = config.GetQueueSizeLimit();
+ }
+ if (config.HasWorkersCount()) {
+ WorkersCount = config.GetWorkersCount();
+ }
+ return true;
+}
+}
diff --git a/ydb/core/tx/conveyor/usage/config.h b/ydb/core/tx/conveyor/usage/config.h
new file mode 100644
index 00000000000..0afbe53cefe
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/config.h
@@ -0,0 +1,16 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/protos/config.pb.h>
+
+namespace NKikimr::NConveyor {
+
+class TConfig {
+private:
+ YDB_OPT(ui32, WorkersCount);
+ YDB_READONLY(ui32, QueueSizeLimit, 256);
+ YDB_READONLY_FLAG(Enabled, true);
+public:
+ bool DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config);
+};
+
+}
diff --git a/ydb/core/tx/conveyor/usage/events.cpp b/ydb/core/tx/conveyor/usage/events.cpp
new file mode 100644
index 00000000000..beb3b35860d
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/events.cpp
@@ -0,0 +1,5 @@
+#include "events.h"
+
+namespace NKikimr::NConveyor {
+
+}
diff --git a/ydb/core/tx/conveyor/usage/events.h b/ydb/core/tx/conveyor/usage/events.h
new file mode 100644
index 00000000000..12ab77bd5c6
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/events.h
@@ -0,0 +1,39 @@
+#pragma once
+#include "abstract.h"
+#include <library/cpp/actors/core/event_local.h>
+#include <library/cpp/actors/core/events.h>
+#include <ydb/services/metadata/request/common.h>
+
+namespace NKikimr::NConveyor {
+
+struct TEvExecution {
+ enum EEv {
+ EvNewTask = EventSpaceBegin(TKikimrEvents::ES_CONVEYOR),
+ EvTaskProcessedResult,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CONVEYOR), "expected EvEnd < EventSpaceEnd");
+
+ class TEvNewTask: public NActors::TEventLocal<TEvNewTask, EvNewTask> {
+ private:
+ YDB_READONLY_DEF(ITask::TPtr, Task);
+ public:
+ TEvNewTask() = default;
+
+ explicit TEvNewTask(ITask::TPtr task)
+ : Task(task) {
+ }
+ };
+
+ class TEvTaskProcessedResult:
+ public NActors::TEventLocal<TEvTaskProcessedResult, EvTaskProcessedResult>,
+ public NMetadata::NRequest::TMaybeResult<ITask::TPtr> {
+ private:
+ using TBase = NMetadata::NRequest::TMaybeResult<ITask::TPtr>;
+ public:
+ using TBase::TBase;
+ };
+};
+
+}
diff --git a/ydb/core/tx/conveyor/usage/service.cpp b/ydb/core/tx/conveyor/usage/service.cpp
new file mode 100644
index 00000000000..71416b06880
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/service.cpp
@@ -0,0 +1,17 @@
+#include "service.h"
+
+namespace NKikimr::NConveyor {
+
+bool TServiceOperator::IsEnabled() {
+ return Singleton<TServiceOperator>()->IsEnabledFlag;
+}
+
+void TServiceOperator::Register(const TConfig& serviceConfig) {
+ Singleton<TServiceOperator>()->IsEnabledFlag = serviceConfig.IsEnabled();
+}
+
+NActors::TActorId MakeServiceId(const ui32 nodeId) {
+ return NActors::TActorId(nodeId, "SrvcConveyor");
+}
+
+}
diff --git a/ydb/core/tx/conveyor/usage/service.h b/ydb/core/tx/conveyor/usage/service.h
new file mode 100644
index 00000000000..91d0d797fe4
--- /dev/null
+++ b/ydb/core/tx/conveyor/usage/service.h
@@ -0,0 +1,17 @@
+#pragma once
+#include "config.h"
+#include <library/cpp/actors/core/actorid.h>
+
+namespace NKikimr::NConveyor {
+
+class TServiceOperator {
+private:
+ std::atomic<bool> IsEnabledFlag = false;
+public:
+ static bool IsEnabled();
+ static void Register(const TConfig& serviceConfig);
+};
+
+NActors::TActorId MakeServiceId(const ui32 nodeId);
+
+}
diff --git a/ydb/services/metadata/request/common.h b/ydb/services/metadata/request/common.h
index 6eaaa91a3e9..3f00c3634c7 100644
--- a/ydb/services/metadata/request/common.h
+++ b/ydb/services/metadata/request/common.h
@@ -133,6 +133,11 @@ public:
}
+ TMaybeResult(const TResult& result)
+ : Result(result) {
+
+ }
+
const TResult& operator*() const {
Y_ENSURE(!ErrorMessage, yexception() << "incorrect object for result request");
return Result;