diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-04 19:21:15 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-04 19:21:15 +0300 |
commit | 3f75ec8cd3385bb1b3218186ed863714070c3b47 (patch) | |
tree | 7e3690351d64b4bc30770039f7def5d984b81d0c | |
parent | eacce0097cb55d01d55c0da863f59edd2d9f0d1f (diff) | |
download | ydb-3f75ec8cd3385bb1b3218186ed863714070c3b47.tar.gz |
parallel cs-scan through conveyor service
55 files changed, 904 insertions, 62 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index be982117f31..6b57e420d23 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -160,6 +160,7 @@ struct TKikimrEvents : TEvents { ES_GRPC_CANCELATION, ES_DISCOVERY, ES_EXT_INDEX, + ES_CONVEYOR, }; }; diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt index 211c479be13..cce9e766c78 100644 --- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt @@ -92,6 +92,7 @@ target_link_libraries(run PUBLIC ydb-core-tx core-tx-columnshard core-tx-coordinator + tx-conveyor-service core-tx-datashard core-tx-long_tx_service tx-long_tx_service-public diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt index 4bfff4150d8..6c34ef2660f 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt @@ -93,6 +93,7 @@ target_link_libraries(run PUBLIC ydb-core-tx core-tx-columnshard core-tx-coordinator + tx-conveyor-service core-tx-datashard core-tx-long_tx_service tx-long_tx_service-public diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt index 4bfff4150d8..6c34ef2660f 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt @@ -93,6 +93,7 @@ target_link_libraries(run PUBLIC ydb-core-tx core-tx-columnshard core-tx-coordinator + tx-conveyor-service core-tx-datashard core-tx-long_tx_service tx-long_tx_service-public diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt index 211c479be13..cce9e766c78 100644 --- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt @@ -92,6 +92,7 @@ target_link_libraries(run PUBLIC ydb-core-tx core-tx-columnshard core-tx-coordinator + tx-conveyor-service core-tx-datashard core-tx-long_tx_service tx-long_tx_service-public diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index b584d31ea62..905ac5f22db 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -69,7 +69,8 @@ union TBasicKikimrServicesMask { bool EnableMetadataProvider:1; bool EnableReplicationService:1; bool EnableBackgroundTasks:1; - bool ExternalIndex:1; + bool EnableExternalIndex: 1; + bool EnableConveyor: 1; }; ui64 Raw; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index c7b9d5126e8..9a2200fff7a 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -157,6 +157,10 @@ #include <ydb/services/metadata/ds_table/service.h> #include <ydb/services/metadata/service.h> +#include <ydb/core/tx/conveyor/usage/config.h> +#include <ydb/core/tx/conveyor/service/service.h> +#include <ydb/core/tx/conveyor/usage/service.h> + #include <ydb/services/bg_tasks/ds_table/executor.h> #include <ydb/services/bg_tasks/service.h> #include <ydb/services/ext_index/common/config.h> @@ -2344,6 +2348,28 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu } } +TConveyorInitializer::TConveyorInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) { +} + +void TConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + NConveyor::TConfig serviceConfig; + if (Config.HasConveyorConfig()) { + Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetConveyorConfig())); + } + + if (serviceConfig.IsEnabled()) { + TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets"); + TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_CONVEYOR"); + + auto service = NConveyor::CreateService(serviceConfig, conveyorGroup); + + setup->LocalServices.push_back(std::make_pair( + NConveyor::MakeServiceId(NodeId), + TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId))); + } +} + TExternalIndexInitializer::TExternalIndexInitializer(const TKikimrRunConfig& runConfig) : IKikimrServicesInitializer(runConfig) { } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 74c76ab1ecd..832b3861238 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -383,6 +383,12 @@ private: std::shared_ptr<TModuleFactories> Factories; }; +class TConveyorInitializer: public IKikimrServicesInitializer { +public: + TConveyorInitializer(const TKikimrRunConfig& runConfig); + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + class TExternalIndexInitializer: public IKikimrServicesInitializer { public: TExternalIndexInitializer(const TKikimrRunConfig& runConfig); diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 0c28f502738..bfc3a8b623c 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1487,10 +1487,14 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TMetadataProviderInitializer(runConfig)); } - if (serviceMask.ExternalIndex) { + if (serviceMask.EnableExternalIndex) { sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig)); } + if (serviceMask.EnableConveyor) { + sil->AddServiceInitializer(new TConveyorInitializer(runConfig)); + } + if (serviceMask.EnableBackgroundTasks) { sil->AddServiceInitializer(new TBackgroundTasksInitializer(runConfig)); } diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 62041b9d870..36a276e73ad 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -50,6 +50,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { //runtime->SetLogPriority(NKikimrServices::LONG_TX_SERVICE, NActors::NLog::PRI_DEBUG); runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_TRACE); runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG); + runtime->SetLogPriority(NKikimrServices::TX_CONVEYOR, NActors::NLog::PRI_DEBUG); //runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG); //runtime->SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG); //runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 517864cb64a..b889ef1cfda 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -602,6 +602,12 @@ message TInternalRequestConfig { optional uint32 RetryPeriodFinishSeconds = 2 [default = 30]; } +message TConveyorConfig { + optional bool Enabled = 1 [default = true]; + optional uint32 WorkersCount = 2; + optional uint32 QueueSizeLimit = 3; +} + message TExternalIndexConfig { optional bool Enabled = 1 [default = true]; optional TInternalRequestConfig RequestConfig = 2; @@ -1799,6 +1805,7 @@ message TAppConfig { optional TClientCertificateAuthorization ClientCertificateAuthorization = 62; optional TExternalIndexConfig ExternalIndexConfig = 63; optional bool YamlConfigEnabled = 64; + optional TConveyorConfig ConveyorConfig = 65; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index d1c1c54fedd..feac1edc5b2 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -362,6 +362,7 @@ enum EServiceKikimr { DISCOVERY_CACHE = 1801; EXT_INDEX = 1900; + TX_CONVEYOR = 2000; }; message TActivity { diff --git a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt index 55e14f4e7c1..b5ca30b917c 100644 --- a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt @@ -86,6 +86,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-datastreams ydb-services-discovery services-ext_index-service + tx-conveyor-service ydb-services-fq ydb-services-kesus ydb-services-persqueue_cluster_discovery diff --git a/ydb/core/testlib/CMakeLists.linux-aarch64.txt b/ydb/core/testlib/CMakeLists.linux-aarch64.txt index 90b7724e073..87c8de49023 100644 --- a/ydb/core/testlib/CMakeLists.linux-aarch64.txt +++ b/ydb/core/testlib/CMakeLists.linux-aarch64.txt @@ -87,6 +87,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-datastreams ydb-services-discovery services-ext_index-service + tx-conveyor-service ydb-services-fq ydb-services-kesus ydb-services-persqueue_cluster_discovery diff --git a/ydb/core/testlib/CMakeLists.linux-x86_64.txt b/ydb/core/testlib/CMakeLists.linux-x86_64.txt index 90b7724e073..87c8de49023 100644 --- a/ydb/core/testlib/CMakeLists.linux-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.linux-x86_64.txt @@ -87,6 +87,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-datastreams ydb-services-discovery services-ext_index-service + tx-conveyor-service ydb-services-fq ydb-services-kesus ydb-services-persqueue_cluster_discovery diff --git a/ydb/core/testlib/CMakeLists.windows-x86_64.txt b/ydb/core/testlib/CMakeLists.windows-x86_64.txt index 55e14f4e7c1..b5ca30b917c 100644 --- a/ydb/core/testlib/CMakeLists.windows-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.windows-x86_64.txt @@ -86,6 +86,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-datastreams ydb-services-discovery services-ext_index-service + tx-conveyor-service ydb-services-fq ydb-services-kesus ydb-services-persqueue_cluster_discovery diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 162b35650b8..4c9678c8af8 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -95,6 +95,8 @@ #include <ydb/services/ext_index/common/config.h> #include <ydb/services/ext_index/common/service.h> #include <ydb/services/ext_index/service/executor.h> +#include <ydb/core/tx/conveyor/service/service.h> +#include <ydb/core/tx/conveyor/usage/service.h> #include <ydb/library/folder_service/mock/mock_folder_service.h> #include <ydb/core/client/server/msgbus_server_tracer.h> @@ -712,22 +714,27 @@ namespace Tests { if (Settings->EnableConfigsDispatcher) { auto *dispatcher = NConsole::CreateConfigsDispatcher(Settings->AppConfig, {}); auto aid = Runtime->Register(dispatcher, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); - Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid); + Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } if (Settings->IsEnableMetadataProvider()) { auto* actor = NMetadata::NProvider::CreateService(NMetadata::NProvider::TConfig()); const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); - Runtime->RegisterService(NMetadata::NProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid); + Runtime->RegisterService(NMetadata::NProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } if (Settings->IsEnableBackgroundTasks()) { auto* actor = NBackgroundTasks::CreateService(NBackgroundTasks::TConfig()); const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); - Runtime->RegisterService(NBackgroundTasks::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid); + Runtime->RegisterService(NBackgroundTasks::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } if (Settings->IsEnableExternalIndex()) { auto* actor = NCSIndex::CreateService(NCSIndex::TConfig()); const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); - Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid); + Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); + } + { + auto* actor = NConveyor::CreateService(NConveyor::TConfig(), new ::NMonitoring::TDynamicCounters()); + const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); + Runtime->RegisterService(NConveyor::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); diff --git a/ydb/core/tx/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/CMakeLists.darwin-x86_64.txt index e4fe0eea1fc..3589716aac5 100644 --- a/ydb/core/tx/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/CMakeLists.darwin-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(balance_coverage) add_subdirectory(columnshard) +add_subdirectory(conveyor) add_subdirectory(coordinator) add_subdirectory(datashard) add_subdirectory(long_tx_service) diff --git a/ydb/core/tx/CMakeLists.linux-aarch64.txt b/ydb/core/tx/CMakeLists.linux-aarch64.txt index f2f8b8e3e5a..d0bf8cc5773 100644 --- a/ydb/core/tx/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/CMakeLists.linux-aarch64.txt @@ -8,6 +8,7 @@ add_subdirectory(balance_coverage) add_subdirectory(columnshard) +add_subdirectory(conveyor) add_subdirectory(coordinator) add_subdirectory(datashard) add_subdirectory(long_tx_service) diff --git a/ydb/core/tx/CMakeLists.linux-x86_64.txt b/ydb/core/tx/CMakeLists.linux-x86_64.txt index f2f8b8e3e5a..d0bf8cc5773 100644 --- a/ydb/core/tx/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/CMakeLists.linux-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(balance_coverage) add_subdirectory(columnshard) +add_subdirectory(conveyor) add_subdirectory(coordinator) add_subdirectory(datashard) add_subdirectory(long_tx_service) diff --git a/ydb/core/tx/CMakeLists.windows-x86_64.txt b/ydb/core/tx/CMakeLists.windows-x86_64.txt index e4fe0eea1fc..3589716aac5 100644 --- a/ydb/core/tx/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/CMakeLists.windows-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(balance_coverage) add_subdirectory(columnshard) +add_subdirectory(conveyor) add_subdirectory(coordinator) add_subdirectory(datashard) add_subdirectory(long_tx_service) diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index 5c35ada84d1..a81d192131f 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -36,6 +36,7 @@ target_link_libraries(core-tx-columnshard PUBLIC ydb-core-tablet_flat tx-columnshard-engines core-tx-tiering + tx-conveyor-usage tx-long_tx_service-public ydb-core-util api-protos diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 19ea4034a03..77e8b4de116 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -37,6 +37,7 @@ target_link_libraries(core-tx-columnshard PUBLIC ydb-core-tablet_flat tx-columnshard-engines core-tx-tiering + tx-conveyor-usage tx-long_tx_service-public ydb-core-util api-protos diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 19ea4034a03..77e8b4de116 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -37,6 +37,7 @@ target_link_libraries(core-tx-columnshard PUBLIC ydb-core-tablet_flat tx-columnshard-engines core-tx-tiering + tx-conveyor-usage tx-long_tx_service-public ydb-core-util api-protos diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index 5c35ada84d1..a81d192131f 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -36,6 +36,7 @@ target_link_libraries(core-tx-columnshard PUBLIC ydb-core-tablet_flat tx-columnshard-engines core-tx-tiering + tx-conveyor-usage tx-long_tx_service-public ydb-core-util api-protos diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index d9d29661d2d..4e92a1ebaea 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -30,9 +30,6 @@ using NOlap::TBlobRange; class TColumnShardScanIterator : public TScanIteratorBase { NOlap::TReadMetadata::TConstPtr ReadMetadata; NOlap::TIndexedReadData IndexedData; - THashMap<TBlobRange, ui64> IndexedBlobs; // blobId -> granule - THashSet<TBlobRange> WaitIndexed; - THashMap<ui64, THashSet<TBlobRange>> GranuleBlobs; // granule -> blobs std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted; TVector<TBlobRange> BlobsToRead; ui64 NextBlobIdxToRead = 0; @@ -51,10 +48,10 @@ public: const auto& cmtBlob = ReadMetadata->CommittedBlobs[i]; WaitCommitted.emplace(cmtBlob, batchNo); } - IndexedBlobs = IndexedData.InitRead(batchNo, true); - for (auto& [blobId, granule] : IndexedBlobs) { - WaitIndexed.insert(blobId); - GranuleBlobs[granule].insert(blobId); + auto indexedBlobs = IndexedData.InitRead(batchNo, true); + THashMap<ui64, THashSet<TBlobRange>> granuleBlobs; // granule -> blobs + for (auto& [blobId, granule] : indexedBlobs) { + granuleBlobs[granule].insert(blobId); } // Add cached batches without read @@ -78,21 +75,21 @@ public: // Read all indexed blobs (in correct order) auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); for (ui64 granule : granulesOrder) { - auto& blobs = GranuleBlobs[granule]; + auto& blobs = granuleBlobs[granule]; BlobsToRead.insert(BlobsToRead.end(), blobs.begin(), blobs.end()); } IsReadFinished = ReadMetadata->Empty(); } - void AddData(const TBlobRange& blobRange, TString data) override { + virtual void Apply(IDataPreparationTask::TPtr task) override { + task->Apply(IndexedData); + } + + void AddData(const TBlobRange& blobRange, TString data, IDataTasksProcessor::TPtr processor) override { const auto& blobId = blobRange.BlobId; - if (IndexedBlobs.count(blobRange)) { - if (!WaitIndexed.count(blobRange)) { - return; // ignore duplicate parts - } - WaitIndexed.erase(blobRange); - IndexedData.AddIndexed(blobRange, data); + if (IndexedData.IsIndexedBlob(blobRange)) { + IndexedData.AddIndexed(blobRange, data, processor); } else { auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0}); if (cmt.empty()) { @@ -157,11 +154,11 @@ private: if (limitLeft == 0) { WaitCommitted.clear(); - WaitIndexed.clear(); + IndexedData.ForceFinishWaiting(); IsReadFinished = true; } - if (WaitCommitted.empty() && WaitIndexed.empty() && NextBlobIdxToRead == BlobsToRead.size()) { + if (WaitCommitted.empty() && !IndexedData.HasWaitIndexed() && NextBlobIdxToRead == BlobsToRead.size()) { IsReadFinished = true; } } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 1172dc2171d..da8da9ee25e 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -10,22 +10,25 @@ #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/core/kqp/compute_actor/kqp_compute_events.h> #include <ydb/core/actorlib_impl/long_timer.h> +#include <ydb/core/tx/conveyor/usage/service.h> +#include <ydb/core/tx/conveyor/usage/events.h> #include <ydb/library/yql/core/issue/yql_issue.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/services/metadata/request/common.h> namespace NKikimr::NColumnShard { using namespace NKqp; using NBlobCache::TBlobRange; -class TTxScan : public TTxReadBase { +class TTxScan: public TTxReadBase { public: using TReadMetadataPtr = NOlap::TReadMetadataBase::TConstPtr; TTxScan(TColumnShard* self, TEvColumnShard::TEvScan::TPtr& ev) : TTxReadBase(self) - , Ev(ev) - {} + , Ev(ev) { + } bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; void Complete(const TActorContext& ctx) override; @@ -42,10 +45,25 @@ private: constexpr ui64 INIT_BATCH_ROWS = 1000; -constexpr i64 DEFAULT_READ_AHEAD_BYTES = 100*1024*1024; +constexpr i64 DEFAULT_READ_AHEAD_BYTES = 100 * 1024 * 1024; constexpr TDuration SCAN_HARD_TIMEOUT = TDuration::Minutes(10); constexpr TDuration SCAN_HARD_TIMEOUT_GAP = TDuration::Seconds(5); +class TLocalDataTasksProcessor: public IDataTasksProcessor { +private: + const TActorIdentity OwnerActorId; +protected: + virtual bool DoAdd(IDataPreparationTask::TPtr task) override { + OwnerActorId.Send(NConveyor::MakeServiceId(OwnerActorId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task)); + return true; + } +public: + TLocalDataTasksProcessor(const TActorIdentity& ownerActorId) + : OwnerActorId(ownerActorId) + { + } +}; + class TColumnShardScan : public TActorBootstrapped<TColumnShardScan>, NArrow::IRowWriter { public: static constexpr auto ActorActivityType() { @@ -84,6 +102,9 @@ public: // propagate self actor id // TODO: FlagSubscribeOnSession ? Send(ScanComputeActorId, new TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen), IEventHandle::FlagTrackDelivery); + if (NConveyor::TServiceOperator::IsEnabled()) { + DataTasksProcessor = std::make_shared<TLocalDataTasksProcessor>(SelfId()); + } Become(&TColumnShardScan::StateScan); } @@ -96,6 +117,7 @@ private: hFunc(TEvKqp::TEvAbortExecution, HandleScan); hFunc(TEvents::TEvUndelivered, HandleScan); hFunc(TEvents::TEvWakeup, HandleScan); + hFunc(NConveyor::TEvExecution::TEvTaskProcessedResult, HandleScan); default: Y_FAIL("TColumnShardScan: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); } @@ -135,7 +157,23 @@ private: return true; } + void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) { + ALS_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN) << SelfId(); + Stats.TaskProcessed(); + if (ev->Get()->GetErrorMessage()) { + DataTasksProcessor->Stop(); + SendScanError(ev->Get()->GetErrorMessage()); + Finish(); + } else { + auto t = dynamic_pointer_cast<IDataPreparationTask>(ev->Get()->GetResult()); + Y_VERIFY(t); + ScanIterator->Apply(t); + } + ContinueProcessing(); + } + void HandleScan(TEvKqpCompute::TEvScanDataAck::TPtr& ev) { + Stats.TaskProcessed(); LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " got ScanDataAck" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId @@ -156,6 +194,7 @@ private: } void HandleScan(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) { + auto g = Stats.MakeGuard("EvResult"); LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " blobs response:" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); @@ -188,7 +227,10 @@ private: << " prevFreeSpace: " << PeerFreeSpace); if (ScanIterator) { - ScanIterator->AddData(blobRange, event.Data); + { + auto g = Stats.MakeGuard("AddData"); + ScanIterator->AddData(blobRange, event.Data, DataTasksProcessor); + } ContinueProcessing(); } } @@ -324,6 +366,7 @@ private: // * or there is an in-flight blob read or ScanData message for which // we will get a reply and will be able to proceed futher if (!ScanIterator || InFlightScanDataMessages != 0 || InFlightReads != 0) { + Stats.StartWait(); return; } } @@ -332,6 +375,7 @@ private: LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " is hanging" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + Stats.StartWait(); } void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) { @@ -391,6 +435,7 @@ private: } void NextReadMetadata() { + auto g = Stats.MakeGuard("NextReadMetadata"); ScanIterator.reset(); ++ReadMetadataIndex; @@ -561,6 +606,8 @@ private: i64 InFlightScanDataMessages = 0; bool Finished = false; + IDataTasksProcessor::TPtr DataTasksProcessor; + class TBlobStats { private: ui64 PartsCount = 0; @@ -598,11 +645,25 @@ private: TBlobStats CacheBlobs; TBlobStats MissBlobs; THashMap<TString, TDuration> GuardedDurations; + THashMap<TString, TInstant> StartGuards; + std::optional<TInstant> FirstReceived; + TInstant LastWaitStart; + TDuration WaitReceive; public: + void StartWait() { + Y_VERIFY_DEBUG(!LastWaitStart); + LastWaitStart = Now(); + } + TString DebugString() const { TStringBuilder sb; sb << "SCAN_STATS;"; + sb << "start=" << StartInstant << ";"; + if (FirstReceived) { + sb << "frw=" << *FirstReceived - StartInstant << ";"; + } + sb << "srw=" << WaitReceive << ";"; sb << "d=" << FinishInstant - StartInstant << ";"; if (RequestsCount) { sb << "req:{count=" << RequestsCount << ";bytes=" << RequestedBytes << ";bytes_avg=" << RequestedBytes / RequestsCount << "};"; @@ -612,7 +673,12 @@ private: sb << "NO_REQUESTS;"; } for (auto&& i : GuardedDurations) { - sb << i.first << "=" << i.second << ";"; + auto it = StartGuards.find(i.first); + TDuration delta; + if (it != StartGuards.end()) { + delta = Now() - it->second; + } + sb << i.first << "=" << i.second + delta << ";"; } return sb; } @@ -621,17 +687,18 @@ private: private: TScanStats& Owner; const TInstant Start = Now(); - TString SectionName; + const TString SectionName; public: TGuard(const TString& sectionName, TScanStats& owner) : Owner(owner) , SectionName(sectionName) { - + Y_VERIFY(Owner.StartGuards.emplace(SectionName, Start).second); } ~TGuard() { Owner.GuardedDurations[SectionName] += Now() - Start; + Owner.StartGuards.erase(SectionName); } }; @@ -648,7 +715,21 @@ private: } } + void TaskProcessed() { + if (LastWaitStart) { + WaitReceive += Now() - LastWaitStart; + LastWaitStart = TInstant::Zero(); + } + } + void BlobReceived(const NBlobCache::TBlobRange& br, const bool fromCache, const TInstant replyInstant) { + if (!FirstReceived) { + FirstReceived = Now(); + } + if (LastWaitStart) { + WaitReceive += Now() - LastWaitStart; + LastWaitStart = TInstant::Zero(); + } auto it = StartBlobRequest.find(br); Y_VERIFY(it != StartBlobRequest.end()); const TDuration d = replyInstant - it->second; diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index 1447d494c18..f462aedc469 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -9,7 +9,10 @@ class TScanIteratorBase { public: virtual ~TScanIteratorBase() = default; - virtual void AddData(const NBlobCache::TBlobRange& /*blobRange*/, TString /*data*/) {} + virtual void Apply(IDataPreparationTask::TPtr /*processor*/) { + + } + virtual void AddData(const NBlobCache::TBlobRange& /*blobRange*/, TString /*data*/, IDataTasksProcessor::TPtr /*processor*/) {} virtual bool Finished() const = 0; virtual NOlap::TPartialReadResult GetBatch() = 0; virtual NBlobCache::TBlobRange GetNextBlobToRead() { return NBlobCache::TBlobRange(); } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 4466b546a73..050344eee48 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -114,6 +114,33 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v } +bool TIndexedReadData::TAssembledNotFiltered::DoExecuteImpl() { + /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey. + /// It's not OK to apply predicate before replacing key duplicates otherwise. + /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here + auto filtered = NOlap::FilterPortion(Batch, *ReadMetadata); + if (filtered.Batch) { + Y_VERIFY(filtered.Valid()); + filtered.ApplyFilter(); + } +#if 1 // optimization + if (filtered.Batch && ReadMetadata->Program && AllowEarlyFilter) { + filtered = NOlap::EarlyFilter(filtered.Batch, ReadMetadata->Program); + } + if (filtered.Batch) { + Y_VERIFY(filtered.Valid()); + filtered.ApplyFilter(); + } +#endif + FilteredBatch = filtered.Batch; + return true; +} + +bool TIndexedReadData::TAssembledNotFiltered::DoApply(TIndexedReadData& owner) const { + owner.PortionFinished(BatchNo, FilteredBatch); + return true; +} + std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan() const { return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this()); } @@ -218,27 +245,34 @@ THashMap<TBlobRange, ui64> TIndexedReadData::InitRead(ui32 inputBatch, bool inGr return out; } -void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& column) { +void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data, NColumnShard::IDataTasksProcessor::TPtr processor) { Y_VERIFY(IndexedBlobs.count(blobRange)); ui32 batchNo = IndexedBlobs[blobRange]; if (!WaitIndexed.count(batchNo)) { return; } auto& waitingFor = WaitIndexed[batchNo]; - waitingFor.erase(blobRange); + if (waitingFor.erase(blobRange) != 1) { + return; + } - Data[blobRange] = column; + Data[blobRange] = data; if (waitingFor.empty()) { - WaitIndexed.erase(batchNo); - if (auto batch = AssembleIndexedBatch(batchNo)) { - Indexed[batchNo] = batch; + if (auto batch = AssembleIndexedBatch(batchNo, processor)) { + if (processor) { + processor->Add(batch); + } else { + batch->Execute(); + batch->Apply(*this); + } + } else { + WaitIndexed.erase(batchNo); } - UpdateGranuleWaits(batchNo); } } -std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32 batchNo) { +NColumnShard::IDataPreparationTask::TPtr TIndexedReadData::AssembleIndexedBatch(ui32 batchNo, NColumnShard::IDataTasksProcessor::TPtr processor) { auto& portionInfo = Portion(batchNo); Y_VERIFY(portionInfo.Produced()); @@ -250,24 +284,7 @@ std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32 Data.erase(blobRange); } - /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey. - /// It's not OK to apply predicate before replacing key duplicates otherwise. - /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here - auto filtered = NOlap::FilterPortion(batch, *ReadMetadata); - if (filtered.Batch) { - Y_VERIFY(filtered.Valid()); - filtered.ApplyFilter(); - } -#if 1 // optimization - if (filtered.Batch && ReadMetadata->Program && portionInfo.AllowEarlyFilter()) { - filtered = NOlap::EarlyFilter(filtered.Batch, ReadMetadata->Program); - } - if (filtered.Batch) { - Y_VERIFY(filtered.Valid()); - filtered.ApplyFilter(); - } -#endif - return filtered.Batch; + return std::make_shared<TAssembledNotFiltered>(batch, ReadMetadata, batchNo, portionInfo.AllowEarlyFilter(), processor); } void TIndexedReadData::UpdateGranuleWaits(ui32 batchNo) { @@ -590,4 +607,23 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco return out; } +void TIndexedReadData::PortionFinished(const ui32 batchNo, std::shared_ptr<arrow::RecordBatch> batch) { + WaitIndexed.erase(batchNo); + if (batch) { + Y_VERIFY(Indexed.emplace(batchNo, batch).second); + } + UpdateGranuleWaits(batchNo); +} + +} + +namespace NKikimr::NColumnShard { + +bool IDataPreparationTask::DoExecute() { + if (OwnerOperator && OwnerOperator->IsStopped()) { + return true; + } else { + return DoExecuteImpl(); + } +} } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index bc6764dd0b3..b815aa216b7 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -3,12 +3,62 @@ #include "column_engine.h" #include "column_engine_logs.h" // for TColumnEngineForLogs::TMark #include "predicate.h" +#include <ydb/core/tx/conveyor/usage/abstract.h> namespace NKikimr::NColumnShard { class TScanIteratorBase; } namespace NKikimr::NOlap { +class TIndexedReadData; +} + +namespace NKikimr::NColumnShard { + +class IDataTasksProcessor; + +class IDataPreparationTask: public NConveyor::ITask { +private: + std::shared_ptr<IDataTasksProcessor> OwnerOperator; +protected: + virtual bool DoApply(NOlap::TIndexedReadData& indexedDataRead) const = 0; + virtual bool DoExecuteImpl() = 0; + + virtual bool DoExecute() override final; +public: + IDataPreparationTask(std::shared_ptr<IDataTasksProcessor> ownerOperator) + : OwnerOperator(ownerOperator) + { + + } + using TPtr = std::shared_ptr<IDataPreparationTask>; + virtual ~IDataPreparationTask() = default; + bool Apply(NOlap::TIndexedReadData& indexedDataRead) const { + return DoApply(indexedDataRead); + } +}; + +class IDataTasksProcessor { +protected: + virtual bool DoAdd(IDataPreparationTask::TPtr task) = 0; + std::atomic<bool> Stopped = false; +public: + void Stop() { + Stopped = true; + } + bool IsStopped() const { + return Stopped; + } + + using TPtr = std::shared_ptr<IDataTasksProcessor>; + virtual ~IDataTasksProcessor() = default; + bool Add(IDataPreparationTask::TPtr task) { + return DoAdd(task); + } +}; +} + +namespace NKikimr::NOlap { struct TReadStats { TInstant BeginTimestamp; @@ -207,16 +257,50 @@ public: NotIndexed[batchNo] = MakeNotIndexedBatch(batch, planStep, txId); } - void AddIndexed(const TBlobRange& blobRange, const TString& column); + void AddIndexed(const TBlobRange& blobRange, const TString& column, NColumnShard::IDataTasksProcessor::TPtr processor); size_t NumPortions() const { return PortionBatch.size(); } bool HasIndexRead() const { return WaitIndexed.size() || Indexed.size(); } - + bool IsIndexedBlob(const TBlobRange& blobRange) const { + return IndexedBlobs.contains(blobRange); + } + void ForceFinishWaiting() { + WaitIndexed.clear(); + } + bool HasWaitIndexed() const { + return WaitIndexed.size(); + } private: NOlap::TReadMetadata::TConstPtr ReadMetadata; ui32 FirstIndexedBatch{0}; THashMap<TBlobRange, TString> Data; std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; THashMap<ui32, std::shared_ptr<arrow::RecordBatch>> Indexed; + + class TAssembledNotFiltered: public NColumnShard::IDataPreparationTask { + private: + using TBase = NColumnShard::IDataPreparationTask; + std::shared_ptr<arrow::RecordBatch> Batch; + std::shared_ptr<arrow::RecordBatch> FilteredBatch; + NOlap::TReadMetadata::TConstPtr ReadMetadata; + ui32 BatchNo = 0; + bool AllowEarlyFilter = false; + protected: + virtual bool DoApply(TIndexedReadData& owner) const override; + virtual bool DoExecuteImpl() override; + public: + TAssembledNotFiltered(std::shared_ptr<arrow::RecordBatch> batch, NOlap::TReadMetadata::TConstPtr readMetadata, + const ui32 batchNo, const bool allowEarlyFilter, NColumnShard::IDataTasksProcessor::TPtr processor) + : TBase(processor) + , Batch(batch) + , ReadMetadata(readMetadata) + , BatchNo(batchNo) + , AllowEarlyFilter(allowEarlyFilter) + { + + } + }; + void PortionFinished(const ui32 batchNo, std::shared_ptr<arrow::RecordBatch> batch); + THashMap<ui32, THashSet<TBlobRange>> WaitIndexed; THashMap<TBlobRange, ui32> IndexedBlobs; // blobId -> batchNo ui32 ReadyNotIndexed{0}; @@ -251,7 +335,8 @@ private: std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch( const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const; - std::shared_ptr<arrow::RecordBatch> AssembleIndexedBatch(ui32 batchNo); + + NColumnShard::IDataPreparationTask::TPtr AssembleIndexedBatch(ui32 batchNo, NColumnShard::IDataTasksProcessor::TPtr processor); void UpdateGranuleWaits(ui32 batchNo); std::shared_ptr<arrow::RecordBatch> MergeNotIndexed( std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const; diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 11b053aea16..7a0d8095d07 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -51,7 +51,7 @@ public: return; // ignore duplicate parts } WaitIndexed.erase(event.BlobRange); - IndexedData.AddIndexed(event.BlobRange, event.Data); + IndexedData.AddIndexed(event.BlobRange, event.Data, nullptr); } else if (CommittedBlobs.count(blobId)) { auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0}); if (cmt.empty()) { diff --git a/ydb/core/tx/conveyor/CMakeLists.txt b/ydb/core/tx/conveyor/CMakeLists.txt new file mode 100644 index 00000000000..57251a735fa --- /dev/null +++ b/ydb/core/tx/conveyor/CMakeLists.txt @@ -0,0 +1,10 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(service) +add_subdirectory(usage) diff --git a/ydb/core/tx/conveyor/service/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/conveyor/service/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..54a440d2794 --- /dev/null +++ b/ydb/core/tx/conveyor/service/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-conveyor-service) +target_link_libraries(tx-conveyor-service PUBLIC + contrib-libs-cxxsupp + yutil + tx-conveyor-usage + ydb-core-protos +) +target_sources(tx-conveyor-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/worker.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/service.cpp +) diff --git a/ydb/core/tx/conveyor/service/CMakeLists.linux-aarch64.txt b/ydb/core/tx/conveyor/service/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..b862166c4cf --- /dev/null +++ b/ydb/core/tx/conveyor/service/CMakeLists.linux-aarch64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-conveyor-service) +target_link_libraries(tx-conveyor-service PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + tx-conveyor-usage + ydb-core-protos +) +target_sources(tx-conveyor-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/worker.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/service.cpp +) diff --git a/ydb/core/tx/conveyor/service/CMakeLists.linux-x86_64.txt b/ydb/core/tx/conveyor/service/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..b862166c4cf --- /dev/null +++ b/ydb/core/tx/conveyor/service/CMakeLists.linux-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-conveyor-service) +target_link_libraries(tx-conveyor-service PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + tx-conveyor-usage + ydb-core-protos +) +target_sources(tx-conveyor-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/worker.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/service.cpp +) diff --git a/ydb/core/tx/conveyor/service/CMakeLists.txt b/ydb/core/tx/conveyor/service/CMakeLists.txt new file mode 100644 index 00000000000..a692f90f36e --- /dev/null +++ b/ydb/core/tx/conveyor/service/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64") + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/conveyor/service/CMakeLists.windows-x86_64.txt b/ydb/core/tx/conveyor/service/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..54a440d2794 --- /dev/null +++ b/ydb/core/tx/conveyor/service/CMakeLists.windows-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-conveyor-service) +target_link_libraries(tx-conveyor-service PUBLIC + contrib-libs-cxxsupp + yutil + tx-conveyor-usage + ydb-core-protos +) +target_sources(tx-conveyor-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/worker.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/service/service.cpp +) diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp new file mode 100644 index 00000000000..710f372276b --- /dev/null +++ b/ydb/core/tx/conveyor/service/service.cpp @@ -0,0 +1,66 @@ +#include "service.h" +#include <ydb/core/tx/conveyor/usage/service.h> +#include <ydb/core/kqp/query_data/kqp_predictor.h> + +namespace NKikimr::NConveyor { + +NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) { + return new TDistributor(config, "common", conveyorSignals); +} + +TDistributor::TDistributor(const TConfig& config, const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals) + : Config(config) + , ConveyorName(conveyorName) + , WaitingQueueSize(conveyorSignals->GetCounter("WaitingQueueSize")) + , WorkersCount(conveyorSignals->GetCounter("WorkersCount")) + , WorkersCountLimit(conveyorSignals->GetCounter("WorkersCountLimit")) + , IncomingRate(conveyorSignals->GetCounter("Incoming", true)) + , SolutionsRate(conveyorSignals->GetCounter("Solved", true)) { + +} + +void TDistributor::Bootstrap() { + ALS_NOTICE(NKikimrServices::TX_CONVEYOR) << "conveyor registered: " << SelfId(); + TServiceOperator::Register(Config); + for (ui32 i = 0; i < Config.GetWorkersCountDef(NKqp::TStagePredictor::GetUsableThreads()); ++i) { + Workers.emplace_back(Register(new TWorker())); + } + WorkersCountLimit->Set(Workers.size()); + Become(&TDistributor::StateMain); +} + +void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) { + ALS_DEBUG(NKikimrServices::TX_CONVEYOR) << "action=processed;owner=" << ev->Get()->GetOwnerId() << ";workers=" << Workers.size() << ";waiting=" << Waiting.size(); + SolutionsRate->Inc(); + if (Waiting.size()) { + Send(ev->Sender, new TEvInternal::TEvNewTask(Waiting.front())); + Waiting.pop_front(); + } else { + Workers.emplace_back(ev->Sender); + } + if (!*ev->Get()) { + Send(ev->Get()->GetOwnerId(), new TEvExecution::TEvTaskProcessedResult(ev->Get()->GetErrorMessage())); + } else { + Send(ev->Get()->GetOwnerId(), new TEvExecution::TEvTaskProcessedResult(ev->Get()->GetResult())); + } + WaitingQueueSize->Set(Waiting.size()); + WorkersCount->Set(Workers.size()); +} + +void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) { + ALS_DEBUG(NKikimrServices::TX_CONVEYOR) << "action=add_task;owner=" << ev->Sender << ";workers=" << Workers.size() << ";waiting=" << Waiting.size(); + IncomingRate->Inc(); + if (Workers.size()) { + Send(Workers.back(), new TEvInternal::TEvNewTask(TWorkerTask(ev->Get()->GetTask(), ev->Sender))); + Workers.pop_back(); + } else if (Waiting.size() < Config.GetQueueSizeLimit()) { + Waiting.emplace_back(ev->Get()->GetTask(), ev->Sender); + } else { + Send(ev->Sender, new TEvExecution::TEvTaskProcessedResult("scan conveyor overloaded (" + + ::ToString(Waiting.size()) + " > " + ::ToString(Config.GetQueueSizeLimit()) + ")")); + } + WaitingQueueSize->Set(Waiting.size()); + WorkersCount->Set(Workers.size()); +} + +} diff --git a/ydb/core/tx/conveyor/service/service.h b/ydb/core/tx/conveyor/service/service.h new file mode 100644 index 00000000000..149cab231af --- /dev/null +++ b/ydb/core/tx/conveyor/service/service.h @@ -0,0 +1,44 @@ +#pragma once +#include "worker.h" +#include <ydb/core/tx/conveyor/usage/config.h> +#include <ydb/core/tx/conveyor/usage/events.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NKikimr::NConveyor { + +class TDistributor: public TActorBootstrapped<TDistributor> { +private: + const TConfig Config; + const TString ConveyorName = "common"; + std::vector<TActorId> Workers; + std::deque<TWorkerTask> Waiting; + const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSize; + const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCount; + const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCountLimit; + const ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRate; + const ::NMonitoring::TDynamicCounters::TCounterPtr SolutionsRate; + + void HandleMain(TEvExecution::TEvNewTask::TPtr& ev); + void HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev); + +public: + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExecution::TEvNewTask, HandleMain); + hFunc(TEvInternal::TEvTaskProcessedResult, HandleMain); + default: + ALS_ERROR(NKikimrServices::TX_CONVEYOR) << ConveyorName << ": unexpected event for task executor: " << ev->GetTypeRewrite(); + break; + } + } + + TDistributor(const TConfig& config, const TString& conveyorName, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals); + + void Bootstrap(); +}; + +NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorSignals); + +} diff --git a/ydb/core/tx/conveyor/service/worker.cpp b/ydb/core/tx/conveyor/service/worker.cpp new file mode 100644 index 00000000000..caba610a3ac --- /dev/null +++ b/ydb/core/tx/conveyor/service/worker.cpp @@ -0,0 +1,14 @@ +#include "worker.h" + +namespace NKikimr::NConveyor { + +void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) { + auto& workerTask = ev->Get()->GetTask(); + if (workerTask.GetTask()->Execute()) { + TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask.GetOwnerId(), workerTask.GetTask()).SendTo(ev->Sender); + } else { + TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask.GetOwnerId(), "cannot execute task").SendTo(ev->Sender); + } +} + +} diff --git a/ydb/core/tx/conveyor/service/worker.h b/ydb/core/tx/conveyor/service/worker.h new file mode 100644 index 00000000000..32b0148adda --- /dev/null +++ b/ydb/core/tx/conveyor/service/worker.h @@ -0,0 +1,90 @@ +#pragma once +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/conveyor/usage/abstract.h> +#include <ydb/core/protos/services.pb.h> +#include <ydb/services/metadata/request/common.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/core/hfunc.h> + +namespace NKikimr::NConveyor { + +class TWorkerTask { +private: + YDB_READONLY_DEF(ITask::TPtr, Task); + YDB_READONLY_DEF(NActors::TActorId, OwnerId); +public: + TWorkerTask(ITask::TPtr task, const NActors::TActorId& ownerId) + : Task(task) + , OwnerId(ownerId) { + + } +}; + +struct TEvInternal { + enum EEv { + EvNewTask = EventSpaceBegin(TEvents::ES_PRIVATE), + EvTaskProcessedResult, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expected EvEnd < EventSpaceEnd"); + + class TEvNewTask: public NActors::TEventLocal<TEvNewTask, EvNewTask> { + private: + TWorkerTask Task; + public: + TEvNewTask() = default; + + const TWorkerTask& GetTask() const { + return Task; + } + + explicit TEvNewTask(const TWorkerTask& task) + : Task(task) { + } + }; + + class TEvTaskProcessedResult: + public NActors::TEventLocal<TEvTaskProcessedResult, EvTaskProcessedResult>, + public NMetadata::NRequest::TMaybeResult<ITask::TPtr> { + private: + using TBase = NMetadata::NRequest::TMaybeResult<ITask::TPtr>; + YDB_READONLY_DEF(NActors::TActorId, OwnerId); + public: + TEvTaskProcessedResult(const NActors::TActorId& ownerId, const TString& errorMessage) + : TBase(errorMessage) + , OwnerId(ownerId) { + + } + TEvTaskProcessedResult(const NActors::TActorId& ownerId, ITask::TPtr result) + : TBase(result) + , OwnerId(ownerId) { + + } + }; +}; + +class TWorker: public NActors::TActorBootstrapped<TWorker> { +private: + using TBase = NActors::TActorBootstrapped<TWorker>; +public: + void HandleMain(TEvInternal::TEvNewTask::TPtr& ev); + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvInternal::TEvNewTask, HandleMain); + default: + ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "unexpected event for task executor: " << ev->GetTypeRewrite(); + break; + } + } + + void Bootstrap() { + Become(&TWorker::StateMain); + } +}; + +} diff --git a/ydb/core/tx/conveyor/usage/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/conveyor/usage/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..f18e519b827 --- /dev/null +++ b/ydb/core/tx/conveyor/usage/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-conveyor-usage) +target_link_libraries(tx-conveyor-usage PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + services-metadata-request +) +target_sources(tx-conveyor-usage PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/service.cpp +) diff --git a/ydb/core/tx/conveyor/usage/CMakeLists.linux-aarch64.txt b/ydb/core/tx/conveyor/usage/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..abe9415a545 --- /dev/null +++ b/ydb/core/tx/conveyor/usage/CMakeLists.linux-aarch64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-conveyor-usage) +target_link_libraries(tx-conveyor-usage PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + services-metadata-request +) +target_sources(tx-conveyor-usage PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/service.cpp +) diff --git a/ydb/core/tx/conveyor/usage/CMakeLists.linux-x86_64.txt b/ydb/core/tx/conveyor/usage/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..abe9415a545 --- /dev/null +++ b/ydb/core/tx/conveyor/usage/CMakeLists.linux-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-conveyor-usage) +target_link_libraries(tx-conveyor-usage PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + services-metadata-request +) +target_sources(tx-conveyor-usage PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/service.cpp +) diff --git a/ydb/core/tx/conveyor/usage/CMakeLists.txt b/ydb/core/tx/conveyor/usage/CMakeLists.txt new file mode 100644 index 00000000000..a692f90f36e --- /dev/null +++ b/ydb/core/tx/conveyor/usage/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64") + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/conveyor/usage/CMakeLists.windows-x86_64.txt b/ydb/core/tx/conveyor/usage/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..f18e519b827 --- /dev/null +++ b/ydb/core/tx/conveyor/usage/CMakeLists.windows-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-conveyor-usage) +target_link_libraries(tx-conveyor-usage PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + services-metadata-request +) +target_sources(tx-conveyor-usage PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/conveyor/usage/service.cpp +) diff --git a/ydb/core/tx/conveyor/usage/abstract.cpp b/ydb/core/tx/conveyor/usage/abstract.cpp new file mode 100644 index 00000000000..42737a8e146 --- /dev/null +++ b/ydb/core/tx/conveyor/usage/abstract.cpp @@ -0,0 +1,5 @@ +#include "abstract.h" + +namespace NKikimr::NConveyor { + +} diff --git a/ydb/core/tx/conveyor/usage/abstract.h b/ydb/core/tx/conveyor/usage/abstract.h new file mode 100644 index 00000000000..59018093308 --- /dev/null +++ b/ydb/core/tx/conveyor/usage/abstract.h @@ -0,0 +1,17 @@ +#pragma once +#include <memory> + +namespace NKikimr::NConveyor { + +class ITask { +protected: + virtual bool DoExecute() = 0; +public: + using TPtr = std::shared_ptr<ITask>; + virtual ~ITask() = default; + bool Execute() { + return DoExecute(); + } +}; + +} diff --git a/ydb/core/tx/conveyor/usage/config.cpp b/ydb/core/tx/conveyor/usage/config.cpp new file mode 100644 index 00000000000..b73683cd2ac --- /dev/null +++ b/ydb/core/tx/conveyor/usage/config.cpp @@ -0,0 +1,19 @@ +#include "config.h" + +namespace NKikimr::NConveyor { + +bool TConfig::DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config) { + if (!config.HasEnabled()) { + EnabledFlag = true; + } else { + EnabledFlag = config.GetEnabled(); + } + if (config.HasQueueSizeLimit()) { + QueueSizeLimit = config.GetQueueSizeLimit(); + } + if (config.HasWorkersCount()) { + WorkersCount = config.GetWorkersCount(); + } + return true; +} +} diff --git a/ydb/core/tx/conveyor/usage/config.h b/ydb/core/tx/conveyor/usage/config.h new file mode 100644 index 00000000000..0afbe53cefe --- /dev/null +++ b/ydb/core/tx/conveyor/usage/config.h @@ -0,0 +1,16 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/protos/config.pb.h> + +namespace NKikimr::NConveyor { + +class TConfig { +private: + YDB_OPT(ui32, WorkersCount); + YDB_READONLY(ui32, QueueSizeLimit, 256); + YDB_READONLY_FLAG(Enabled, true); +public: + bool DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config); +}; + +} diff --git a/ydb/core/tx/conveyor/usage/events.cpp b/ydb/core/tx/conveyor/usage/events.cpp new file mode 100644 index 00000000000..beb3b35860d --- /dev/null +++ b/ydb/core/tx/conveyor/usage/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NConveyor { + +} diff --git a/ydb/core/tx/conveyor/usage/events.h b/ydb/core/tx/conveyor/usage/events.h new file mode 100644 index 00000000000..12ab77bd5c6 --- /dev/null +++ b/ydb/core/tx/conveyor/usage/events.h @@ -0,0 +1,39 @@ +#pragma once +#include "abstract.h" +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/actors/core/events.h> +#include <ydb/services/metadata/request/common.h> + +namespace NKikimr::NConveyor { + +struct TEvExecution { + enum EEv { + EvNewTask = EventSpaceBegin(TKikimrEvents::ES_CONVEYOR), + EvTaskProcessedResult, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CONVEYOR), "expected EvEnd < EventSpaceEnd"); + + class TEvNewTask: public NActors::TEventLocal<TEvNewTask, EvNewTask> { + private: + YDB_READONLY_DEF(ITask::TPtr, Task); + public: + TEvNewTask() = default; + + explicit TEvNewTask(ITask::TPtr task) + : Task(task) { + } + }; + + class TEvTaskProcessedResult: + public NActors::TEventLocal<TEvTaskProcessedResult, EvTaskProcessedResult>, + public NMetadata::NRequest::TMaybeResult<ITask::TPtr> { + private: + using TBase = NMetadata::NRequest::TMaybeResult<ITask::TPtr>; + public: + using TBase::TBase; + }; +}; + +} diff --git a/ydb/core/tx/conveyor/usage/service.cpp b/ydb/core/tx/conveyor/usage/service.cpp new file mode 100644 index 00000000000..71416b06880 --- /dev/null +++ b/ydb/core/tx/conveyor/usage/service.cpp @@ -0,0 +1,17 @@ +#include "service.h" + +namespace NKikimr::NConveyor { + +bool TServiceOperator::IsEnabled() { + return Singleton<TServiceOperator>()->IsEnabledFlag; +} + +void TServiceOperator::Register(const TConfig& serviceConfig) { + Singleton<TServiceOperator>()->IsEnabledFlag = serviceConfig.IsEnabled(); +} + +NActors::TActorId MakeServiceId(const ui32 nodeId) { + return NActors::TActorId(nodeId, "SrvcConveyor"); +} + +} diff --git a/ydb/core/tx/conveyor/usage/service.h b/ydb/core/tx/conveyor/usage/service.h new file mode 100644 index 00000000000..91d0d797fe4 --- /dev/null +++ b/ydb/core/tx/conveyor/usage/service.h @@ -0,0 +1,17 @@ +#pragma once +#include "config.h" +#include <library/cpp/actors/core/actorid.h> + +namespace NKikimr::NConveyor { + +class TServiceOperator { +private: + std::atomic<bool> IsEnabledFlag = false; +public: + static bool IsEnabled(); + static void Register(const TConfig& serviceConfig); +}; + +NActors::TActorId MakeServiceId(const ui32 nodeId); + +} diff --git a/ydb/services/metadata/request/common.h b/ydb/services/metadata/request/common.h index 6eaaa91a3e9..3f00c3634c7 100644 --- a/ydb/services/metadata/request/common.h +++ b/ydb/services/metadata/request/common.h @@ -133,6 +133,11 @@ public: } + TMaybeResult(const TResult& result) + : Result(result) { + + } + const TResult& operator*() const { Y_ENSURE(!ErrorMessage, yexception() << "incorrect object for result request"); return Result; |