aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-11-07 16:34:06 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-11-07 16:34:06 +0300
commit964584075fe8d2284078222e1e7e2e424c8bd9a1 (patch)
tree317d2d43c895c729d14d2d601d772e7332348f9d
parent61143d004cc8a41d5ee751902baf8b47936aa910 (diff)
downloadydb-964584075fe8d2284078222e1e7e2e424c8bd9a1.tar.gz
add background tasks service
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.txt2
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp23
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h6
-rw-r--r--ydb/core/protos/config.proto11
-rw-r--r--ydb/core/testlib/CMakeLists.txt2
-rw-r--r--ydb/core/testlib/test_client.cpp7
-rw-r--r--ydb/core/testlib/test_client.h1
-rw-r--r--ydb/core/tx/tiering/decoder.cpp28
-rw-r--r--ydb/core/tx/tiering/decoder.h2
-rw-r--r--ydb/services/CMakeLists.txt1
-rw-r--r--ydb/services/bg_tasks/CMakeLists.txt25
-rw-r--r--ydb/services/bg_tasks/abstract/CMakeLists.txt27
-rw-r--r--ydb/services/bg_tasks/abstract/activity.cpp5
-rw-r--r--ydb/services/bg_tasks/abstract/activity.h55
-rw-r--r--ydb/services/bg_tasks/abstract/common.cpp5
-rw-r--r--ydb/services/bg_tasks/abstract/common.h27
-rw-r--r--ydb/services/bg_tasks/abstract/interface.cpp5
-rw-r--r--ydb/services/bg_tasks/abstract/interface.h290
-rw-r--r--ydb/services/bg_tasks/abstract/scheduler.cpp5
-rw-r--r--ydb/services/bg_tasks/abstract/scheduler.h54
-rw-r--r--ydb/services/bg_tasks/abstract/state.cpp5
-rw-r--r--ydb/services/bg_tasks/abstract/state.h25
-rw-r--r--ydb/services/bg_tasks/abstract/task.cpp5
-rw-r--r--ydb/services/bg_tasks/abstract/task.h149
-rw-r--r--ydb/services/bg_tasks/ds_table/CMakeLists.txt35
-rw-r--r--ydb/services/bg_tasks/ds_table/add_tasks.cpp79
-rw-r--r--ydb/services/bg_tasks/ds_table/add_tasks.h27
-rw-r--r--ydb/services/bg_tasks/ds_table/assign_tasks.cpp45
-rw-r--r--ydb/services/bg_tasks/ds_table/assign_tasks.h27
-rw-r--r--ydb/services/bg_tasks/ds_table/config.cpp23
-rw-r--r--ydb/services/bg_tasks/ds_table/config.h22
-rw-r--r--ydb/services/bg_tasks/ds_table/executor.cpp143
-rw-r--r--ydb/services/bg_tasks/ds_table/executor.h102
-rw-r--r--ydb/services/bg_tasks/ds_table/executor_controller.cpp30
-rw-r--r--ydb/services/bg_tasks/ds_table/executor_controller.h39
-rw-r--r--ydb/services/bg_tasks/ds_table/fetch_tasks.cpp63
-rw-r--r--ydb/services/bg_tasks/ds_table/fetch_tasks.h29
-rw-r--r--ydb/services/bg_tasks/ds_table/finish_task.cpp27
-rw-r--r--ydb/services/bg_tasks/ds_table/finish_task.h26
-rw-r--r--ydb/services/bg_tasks/ds_table/interrupt.cpp44
-rw-r--r--ydb/services/bg_tasks/ds_table/interrupt.h40
-rw-r--r--ydb/services/bg_tasks/ds_table/lock_pinger.cpp42
-rw-r--r--ydb/services/bg_tasks/ds_table/lock_pinger.h25
-rw-r--r--ydb/services/bg_tasks/ds_table/task_enabled.cpp31
-rw-r--r--ydb/services/bg_tasks/ds_table/task_enabled.h32
-rw-r--r--ydb/services/bg_tasks/ds_table/task_executor.cpp31
-rw-r--r--ydb/services/bg_tasks/ds_table/task_executor.h54
-rw-r--r--ydb/services/bg_tasks/ds_table/task_executor_controller.cpp15
-rw-r--r--ydb/services/bg_tasks/ds_table/task_executor_controller.h28
-rw-r--r--ydb/services/bg_tasks/protos/CMakeLists.txt31
-rw-r--r--ydb/services/bg_tasks/protos/container.proto6
-rw-r--r--ydb/services/bg_tasks/service.cpp9
-rw-r--r--ydb/services/bg_tasks/service.h75
-rw-r--r--ydb/services/bg_tasks/ut/CMakeLists.darwin.txt57
-rw-r--r--ydb/services/bg_tasks/ut/CMakeLists.linux-aarch64.txt59
-rw-r--r--ydb/services/bg_tasks/ut/CMakeLists.linux.txt61
-rw-r--r--ydb/services/bg_tasks/ut/CMakeLists.txt15
-rw-r--r--ydb/services/bg_tasks/ut/task_emulator.cpp50
-rw-r--r--ydb/services/bg_tasks/ut/task_emulator.h101
-rw-r--r--ydb/services/bg_tasks/ut/ut_tasks.cpp70
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.cpp13
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.h2
-rw-r--r--ydb/services/metadata/ds_table/config.h1
-rw-r--r--ydb/services/metadata/initializer/accessor_init.cpp22
-rw-r--r--ydb/services/metadata/initializer/accessor_init.h5
-rw-r--r--ydb/services/metadata/request/common.h5
-rw-r--r--ydb/services/metadata/request/request_actor.h106
67 files changed, 2487 insertions, 25 deletions
diff --git a/ydb/core/driver_lib/run/CMakeLists.txt b/ydb/core/driver_lib/run/CMakeLists.txt
index 1dad9985c24..0b09685d4ab 100644
--- a/ydb/core/driver_lib/run/CMakeLists.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.txt
@@ -124,6 +124,8 @@ target_link_libraries(run PUBLIC
ydb-services-local_discovery
services-metadata-ds_table
ydb-services-metadata
+ services-bg_tasks-ds_table
+ ydb-services-bg_tasks
ydb-services-monitoring
ydb-services-persqueue_cluster_discovery
ydb-services-persqueue_v1
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 37119176f82..f5e0d339367 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -151,6 +151,9 @@
#include <ydb/services/metadata/ds_table/service.h>
#include <ydb/services/metadata/service.h>
+#include <ydb/services/bg_tasks/ds_table/executor.h>
+#include <ydb/services/bg_tasks/service.h>
+
#include <library/cpp/actors/protos/services_common.pb.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -2012,7 +2015,25 @@ void TMetadataProviderInitializer::InitializeServices(NActors::TActorSystemSetup
if (serviceConfig.IsEnabled()) {
auto service = NMetadataProvider::CreateService(serviceConfig);
setup->LocalServices.push_back(std::make_pair(
- NMetadataProvider::MakeServiceId(),
+ NMetadataProvider::MakeServiceId(NodeId),
+ TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
+ }
+}
+
+TBackgroundTasksInitializer::TBackgroundTasksInitializer(const TKikimrRunConfig& runConfig)
+ : IKikimrServicesInitializer(runConfig) {
+}
+
+void TBackgroundTasksInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+ NBackgroundTasks::TConfig serviceConfig;
+ if (Config.HasBackgroundTasksConfig()) {
+ Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetBackgroundTasksConfig()));
+ }
+
+ if (serviceConfig.IsEnabled()) {
+ auto service = NBackgroundTasks::CreateService(serviceConfig);
+ setup->LocalServices.push_back(std::make_pair(
+ NBackgroundTasks::MakeServiceId(NodeId),
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
}
}
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index 9ec1bdccf4f..f4bf8d00bd4 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -389,6 +389,12 @@ public:
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};
+class TBackgroundTasksInitializer: public IKikimrServicesInitializer {
+public:
+ TBackgroundTasksInitializer(const TKikimrRunConfig& runConfig);
+ void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
class TMemoryLogInitializer : public IKikimrServicesInitializer {
size_t LogBufferSize = 0;
size_t LogGrainSize = 0;
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 2ef714b7273..0f42c3b819d 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -585,6 +585,16 @@ message TMetadataProviderConfig {
optional TInternalRequestConfig RequestConfig = 3;
}
+message TBackgroundTasksConfig {
+ optional bool Enabled = 1 [default = false];
+ optional TInternalRequestConfig RequestConfig = 2;
+ optional uint32 PullPeriodSeconds = 3 [default = 10];
+ optional uint32 PingPeriodSeconds = 4 [default = 2];
+ optional uint32 PingCheckPeriodSeconds = 5 [default = 20];
+ optional uint32 MaxInFlight = 6 [default = 8];
+ optional string InternalTablePath = 7;
+}
+
message TMemoryLogConfig {
optional uint64 LogBufferSize = 1;
optional uint64 LogGrainSize = 2;
@@ -1642,6 +1652,7 @@ message TAppConfig {
optional TFailureInjectionConfig FailureInjectionConfig = 56;
optional THttpProxyConfig PublicHttpConfig = 57;
optional TMetadataProviderConfig MetadataProviderConfig = 59;
+ optional TBackgroundTasksConfig BackgroundTasksConfig = 60;
optional NYq.NConfig.TConfig YandexQueryConfig = 50; // TODO: remove after migration to FederatedQueryConfig
diff --git a/ydb/core/testlib/CMakeLists.txt b/ydb/core/testlib/CMakeLists.txt
index 208914902ec..636e2c0d694 100644
--- a/ydb/core/testlib/CMakeLists.txt
+++ b/ydb/core/testlib/CMakeLists.txt
@@ -92,6 +92,8 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-rate_limiter
ydb-services-monitoring
services-metadata-ds_table
+ services-bg_tasks-ds_table
+ ydb-services-bg_tasks
ydb-services-ydb
ydb-services-yq
ydb-core-http_proxy
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index d3ace1b5383..6c1e1d33d80 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -90,6 +90,8 @@
#include <ydb/core/yq/libs/mock/yql_mock.h>
#include <ydb/services/metadata/ds_table/service.h>
#include <ydb/services/metadata/service.h>
+#include <ydb/services/bg_tasks/ds_table/executor.h>
+#include <ydb/services/bg_tasks/service.h>
#include <ydb/library/folder_service/mock/mock_folder_service.h>
#include <ydb/core/client/server/msgbus_server_tracer.h>
@@ -628,6 +630,11 @@ namespace Tests {
const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NMetadataProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid);
}
+ if (Settings->IsEnableBackgroundTasks()) {
+ auto* actor = NBackgroundTasks::CreateService(NBackgroundTasks::TConfig());
+ const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
+ Runtime->RegisterService(NBackgroundTasks::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid);
+ }
Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
auto sysViewService = NSysView::CreateSysViewServiceForTests();
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index b69246cd534..649cb5d0f7a 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -208,6 +208,7 @@ namespace Tests {
TServerSettings& operator=(const TServerSettings& settings) = default;
private:
YDB_FLAG_ACCESSOR(EnableMetadataProvider, true);
+ YDB_FLAG_ACCESSOR(EnableBackgroundTasks, false);
};
class TServer : public TThrRefBase, TMoveOnly {
diff --git a/ydb/core/tx/tiering/decoder.cpp b/ydb/core/tx/tiering/decoder.cpp
index 40196235569..f8b0c5a190e 100644
--- a/ydb/core/tx/tiering/decoder.cpp
+++ b/ydb/core/tx/tiering/decoder.cpp
@@ -31,6 +31,34 @@ bool TDecoderBase::Read(const ui32 columnIdx, TDuration& result, const Ydb::Valu
return true;
}
+bool TDecoderBase::Read(const ui32 columnIdx, bool& result, const Ydb::Value& r) const {
+ auto& pValue = r.items()[columnIdx];
+ if (pValue.has_bool_value()) {
+ result = pValue.bool_value();
+ } else {
+ ALS_WARN(0) << "incorrect type for instant seconds parsing";
+ return false;
+ }
+ return true;
+}
+
+bool TDecoderBase::Read(const ui32 columnIdx, TInstant& result, const Ydb::Value& r) const {
+ auto& pValue = r.items()[columnIdx];
+ if (pValue.has_uint32_value()) {
+ result = TInstant::Seconds(pValue.uint32_value());
+ } else if (pValue.has_int64_value()) {
+ result = TInstant::Seconds(pValue.int64_value());
+ } else if (pValue.has_uint64_value()) {
+ result = TInstant::Seconds(pValue.uint64_value());
+ } else if (pValue.has_int32_value()) {
+ result = TInstant::Seconds(pValue.int32_value());
+ } else {
+ ALS_WARN(0) << "incorrect type for instant seconds parsing";
+ return false;
+ }
+ return true;
+}
+
bool TDecoderBase::ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const {
const TString& s = r.items()[columnIdx].bytes_value();
if (!::google::protobuf::TextFormat::ParseFromString(s, &result)) {
diff --git a/ydb/core/tx/tiering/decoder.h b/ydb/core/tx/tiering/decoder.h
index 6866feedebb..587f4ae9da8 100644
--- a/ydb/core/tx/tiering/decoder.h
+++ b/ydb/core/tx/tiering/decoder.h
@@ -11,6 +11,8 @@ public:
bool Read(const ui32 columnIdx, TString& result, const Ydb::Value& r) const;
bool ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const;
bool Read(const ui32 columnIdx, TDuration& result, const Ydb::Value& r) const;
+ bool Read(const ui32 columnIdx, TInstant& result, const Ydb::Value& r) const;
+ bool Read(const ui32 columnIdx, bool& result, const Ydb::Value& r) const;
};
}
diff --git a/ydb/services/CMakeLists.txt b/ydb/services/CMakeLists.txt
index da7cf033015..f21cbb406ca 100644
--- a/ydb/services/CMakeLists.txt
+++ b/ydb/services/CMakeLists.txt
@@ -7,6 +7,7 @@
add_subdirectory(auth)
+add_subdirectory(bg_tasks)
add_subdirectory(cms)
add_subdirectory(datastreams)
add_subdirectory(discovery)
diff --git a/ydb/services/bg_tasks/CMakeLists.txt b/ydb/services/bg_tasks/CMakeLists.txt
new file mode 100644
index 00000000000..500fe426388
--- /dev/null
+++ b/ydb/services/bg_tasks/CMakeLists.txt
@@ -0,0 +1,25 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(abstract)
+add_subdirectory(ds_table)
+add_subdirectory(protos)
+add_subdirectory(ut)
+
+add_library(ydb-services-bg_tasks)
+target_link_libraries(ydb-services-bg_tasks PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ services-metadata-abstract
+ services-bg_tasks-abstract
+ services-bg_tasks-protos
+)
+target_sources(ydb-services-bg_tasks PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/service.cpp
+)
diff --git a/ydb/services/bg_tasks/abstract/CMakeLists.txt b/ydb/services/bg_tasks/abstract/CMakeLists.txt
new file mode 100644
index 00000000000..871806f32db
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/CMakeLists.txt
@@ -0,0 +1,27 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-bg_tasks-abstract)
+target_link_libraries(services-bg_tasks-abstract PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-library-accessor
+ cpp-actors-core
+ api-protos
+ services-bg_tasks-protos
+ ydb-core-base
+)
+target_sources(services-bg_tasks-abstract PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/interface.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/scheduler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/activity.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/state.cpp
+)
diff --git a/ydb/services/bg_tasks/abstract/activity.cpp b/ydb/services/bg_tasks/abstract/activity.cpp
new file mode 100644
index 00000000000..ecca1f20fd7
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/activity.cpp
@@ -0,0 +1,5 @@
+#include "activity.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+}
diff --git a/ydb/services/bg_tasks/abstract/activity.h b/ydb/services/bg_tasks/abstract/activity.h
new file mode 100644
index 00000000000..b67b5797f99
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/activity.h
@@ -0,0 +1,55 @@
+#pragma once
+#include "interface.h"
+#include "state.h"
+
+#include <ydb/core/base/events.h>
+
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/object_factory/object_factory.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class ITaskExecutorController {
+protected:
+ virtual void DoTaskInterrupted(ITaskState::TPtr taskState) = 0;
+ virtual void DoTaskFinished() = 0;
+
+public:
+ using TPtr = std::shared_ptr<ITaskExecutorController>;
+ virtual ~ITaskExecutorController() = default;
+
+ void TaskInterrupted(ITaskState::TPtr taskState) {
+ return DoTaskInterrupted(taskState);
+ }
+ virtual void TaskFinished() {
+ return DoTaskFinished();
+ }
+};
+
+class ITaskActivity: public IStringSerializable {
+public:
+ using TPtr = std::shared_ptr<ITaskActivity>;
+ using TFactory = NObjectFactory::TObjectFactory<ITaskActivity, TString>;
+protected:
+ virtual void DoExecute(ITaskExecutorController::TPtr controller, const TTaskStateContainer& state) = 0;
+ virtual void DoFinished(const TTaskStateContainer& /*state*/) {
+
+ }
+public:
+ virtual TString GetClassName() const = 0;
+ void Execute(ITaskExecutorController::TPtr controller, const TTaskStateContainer& state) {
+ DoExecute(controller, state);
+ };
+ void Finished(const TTaskStateContainer& state) {
+ DoFinished(state);
+ };
+};
+
+class TTaskActivityContainer: public TInterfaceStringContainer<ITaskActivity> {
+private:
+ using TBase = TInterfaceStringContainer<ITaskActivity>;
+public:
+ using TBase::TBase;
+};
+
+}
diff --git a/ydb/services/bg_tasks/abstract/common.cpp b/ydb/services/bg_tasks/abstract/common.cpp
new file mode 100644
index 00000000000..490f24b3827
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/common.cpp
@@ -0,0 +1,5 @@
+#include "common.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+}
diff --git a/ydb/services/bg_tasks/abstract/common.h b/ydb/services/bg_tasks/abstract/common.h
new file mode 100644
index 00000000000..4c447c1626b
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/common.h
@@ -0,0 +1,27 @@
+#pragma once
+#include <ydb/core/base/events.h>
+
+#include <library/cpp/actors/core/events.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+enum EEvents {
+ EvAddTask = EventSpaceBegin(TKikimrEvents::ES_BACKGROUND_TASKS),
+ EvStartAssign,
+ EvAssignFinished,
+ EvFetchingFinished,
+ EvTaskFinished,
+ EvTaskInterrupted,
+ EvTaskFetched,
+ EvTaskExecutorFinished,
+ EvUpdateTaskEnabled,
+ EvAddTaskResult,
+ EvUpdateTaskEnabledResult,
+ EvLockPingerStart,
+ EvLockPingerFinished,
+ EvEnd
+};
+
+static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_BACKGROUND_TASKS), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_BACKGROUND_TASKS)");
+
+}
diff --git a/ydb/services/bg_tasks/abstract/interface.cpp b/ydb/services/bg_tasks/abstract/interface.cpp
new file mode 100644
index 00000000000..a0587cc5830
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/interface.cpp
@@ -0,0 +1,5 @@
+#include "interface.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+}
diff --git a/ydb/services/bg_tasks/abstract/interface.h b/ydb/services/bg_tasks/abstract/interface.h
new file mode 100644
index 00000000000..65065963329
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/interface.h
@@ -0,0 +1,290 @@
+#pragma once
+#include <ydb/core/base/events.h>
+#include <ydb/services/bg_tasks/protos/container.pb.h>
+
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/object_factory/object_factory.h>
+#include <library/cpp/json/writer/json_value.h>
+#include <library/cpp/json/json_reader.h>
+#include <library/cpp/actors/core/log.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class IStringSerializable {
+protected:
+ virtual bool DoDeserializeFromString(const TString& data) = 0;
+ virtual TString DoSerializeToString() const = 0;
+public:
+ bool DeserializeFromString(const TString& data) {
+ return DoDeserializeFromString(data);
+ }
+
+ TString SerializeToString() const {
+ return DoSerializeToString();
+ }
+ virtual ~IStringSerializable() = default;
+};
+
+template <class TBaseClass = IStringSerializable>
+class IJsonStringSerializable: public TBaseClass {
+protected:
+ virtual bool DoDeserializeFromString(const TString& data) override final {
+ std::optional<NJson::TJsonValue> jsonData = ParseStringToStorageObject(data);
+ if (!jsonData) {
+ return false;
+ }
+ return DeserializeFromJson(*jsonData);
+ }
+ virtual TString DoSerializeToString() const override final {
+ NJson::TJsonValue jsonData = SerializeToJson();
+ return jsonData.GetStringRobust();
+ }
+ virtual NJson::TJsonValue DoSerializeToJson() const = 0;
+ virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonData) = 0;
+public:
+ static std::optional<NJson::TJsonValue> ParseStringToStorageObject(const TString& data) {
+ NJson::TJsonValue jsonData;
+ if (!NJson::ReadJsonFastTree(data, &jsonData)) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse string as json: " << Base64Encode(data);
+ return {};
+ }
+ return jsonData;
+ }
+ static NJson::TJsonValue SerializeToStorageObject(const IJsonStringSerializable& object) {
+ return object.SerializeToJson();
+ }
+ NJson::TJsonValue SerializeToJson() const {
+ return DoSerializeToJson();
+ }
+ bool DeserializeFromJson(const NJson::TJsonValue& jsonData) {
+ return DoDeserializeFromJson(jsonData);
+ }
+};
+
+template <class TProtoClass, class TBaseClass = IStringSerializable>
+class IProtoStringSerializable: public TBaseClass {
+private:
+ using TSelf = IProtoStringSerializable<TProtoClass, TBaseClass>;
+protected:
+ virtual bool DoDeserializeFromString(const TString& data) override final {
+ std::optional<TProtoClass> protoData = ParseStringToStorageObject(data);
+ if (!protoData) {
+ return false;
+ }
+ return DeserializeFromProto(*protoData);
+ }
+ virtual TString DoSerializeToString() const override final {
+ TProtoClass protoData = SerializeToProto();
+ return protoData.SerializeAsString();
+ }
+ virtual TProtoClass DoSerializeToProto() const = 0;
+ virtual bool DoDeserializeFromProto(const TProtoClass& protoData) = 0;
+public:
+ static std::optional<TProtoClass> ParseStringToStorageObject(const TString& data) {
+ TProtoClass protoData;
+ if (!protoData.ParseFromArray(data.data(), data.size())) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse string as proto: " << Base64Encode(data);
+ return {};
+ }
+ return protoData;
+ }
+ static TProtoClass SerializeToStorageObject(const IProtoStringSerializable& object) {
+ return object.SerializeToProto();
+ }
+ TProtoClass SerializeToProto() const {
+ return DoSerializeToProto();
+ }
+ bool DeserializeFromProto(const TProtoClass& protoData) {
+ return DoDeserializeFromProto(protoData);
+ }
+};
+
+template <class IInterface>
+class TCommonInterfaceContainer {
+protected:
+ std::shared_ptr<IInterface> Object;
+ using TFactory = typename IInterface::TFactory;
+public:
+ TCommonInterfaceContainer() = default;
+ TCommonInterfaceContainer(std::shared_ptr<IInterface> object)
+ : Object(object) {
+
+ }
+
+ bool HasObject() const {
+ return !!Object;
+ }
+
+ template <class T>
+ const T& GetAsSafe() const {
+ auto result = std::dynamic_pointer_cast<T>(Object);
+ Y_VERIFY(!!result);
+ return *result;
+ }
+
+ template <class T>
+ T& GetAsSafe() {
+ auto result = std::dynamic_pointer_cast<T>(Object);
+ Y_VERIFY(!!result);
+ return *result;
+ }
+
+ std::shared_ptr<IInterface> GetObjectPtr() const {
+ return Object;
+ }
+
+ const IInterface* operator->() const {
+ return Object.get();
+ }
+
+ IInterface* operator->() {
+ return Object.get();
+ }
+
+ bool operator!() const {
+ return !Object;
+ }
+
+};
+
+template <class IInterface>
+class TInterfaceStringContainer: public TCommonInterfaceContainer<IInterface> {
+protected:
+ using TBase = TCommonInterfaceContainer<IInterface>;
+ using TFactory = typename TBase::TFactory;
+ using TBase::Object;
+public:
+ using TBase::TBase;
+ TString SerializeToStringBase64() const {
+ return Base64Encode(SerializeToString());
+ }
+
+ TString SerializeToString() const {
+ NKikimrProto::TStringContainer result;
+ if (!Object) {
+ return result.SerializeAsString();
+ }
+ result.SetClassName(Object->GetClassName());
+ result.SetBinaryData(Object->SerializeToString());
+ return result.SerializeAsString();
+ }
+
+ bool DeserializeFromString(const TString& data) {
+ if (!data) {
+ Object = nullptr;
+ return true;
+ }
+ NKikimrProto::TStringContainer protoData;
+ if (!protoData.ParseFromArray(data.data(), data.size())) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse string as proto: " << Base64Encode(data);
+ return {};
+ }
+ const TString& className = protoData.GetClassName();
+ std::shared_ptr<IInterface> object(TFactory::Construct(protoData.GetClassName()));
+ if (!object) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name();
+ return false;
+ }
+
+ if (!object->DeserializeFromString(protoData.GetBinaryData())) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse class instance: " << className << " for " << typeid(IInterface).name();
+ return false;
+ }
+ Object = object;
+ return true;
+ }
+};
+
+class TDefaultJsonContainerPolicy {
+public:
+ static TString GetClassName(const NJson::TJsonValue& jsonInfo) {
+ return jsonInfo["className"].GetStringRobust();
+ }
+ static void SetClassName(NJson::TJsonValue& jsonInfo, const TString& className) {
+ jsonInfo["className"] = className;
+ }
+};
+
+template <class IInterface, class TOperatorPolicy = TDefaultJsonContainerPolicy>
+class TInterfaceJsonContainer: public TCommonInterfaceContainer<IInterface> {
+protected:
+ using TBase = TCommonInterfaceContainer<IInterface>;
+ using TFactory = typename TBase::TFactory;
+ using TBase::Object;
+public:
+ using TBase::TBase;
+ NJson::TJsonValue SerializeToJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ if (!Object) {
+ return result;
+ }
+ TOperatorPolicy::SetClassName(result, Object->GetClassName());
+ result.InsertValue("objectData", Object->SerializeToJson());
+ return result;
+ }
+
+ bool DeserializeFromJson(const NJson::TJsonValue& data) {
+ const TString& className = TOperatorPolicy::GetClassName(data);
+ std::shared_ptr<IInterface> object(TFactory::Construct(className));
+ if (!object) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name();
+ return false;
+ }
+ if (!object->DeserializeFromJson(data["objectData"])) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse class instance: " << className << " for " << typeid(IInterface).name();
+ return false;
+ }
+ Object = object;
+ return true;
+ }
+};
+
+class TDefaultProtoContainerPolicy {
+public:
+ template <class TProto>
+ static TString GetClassName(const TProto& protoInfo) {
+ return protoInfo.GetClassName();
+ }
+ template <class TProto>
+ static void SetClassName(TProto& protoInfo, const TString& className) {
+ protoInfo.SetClassName(className);
+ }
+};
+
+template <class IInterface, class TOperatorPolicy = TDefaultProtoContainerPolicy>
+class TInterfaceProtoContainer: public TCommonInterfaceContainer<IInterface> {
+private:
+ using TProto = typename IInterface::TProto;
+protected:
+ using TBase = TCommonInterfaceContainer<IInterface>;
+ using TFactory = typename TBase::TFactory;
+ using TBase::Object;
+public:
+ using TBase::TBase;
+ bool DeserializeFromProto(const TProto& data) {
+ const TString& className = TOperatorPolicy::GetClassName(data);
+ std::shared_ptr<IInterface> object(TFactory::Construct(className));
+ if (!object) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name();
+ return false;
+ }
+ if (!object->DeserializeFromProto(data)) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse class instance: " << className << " for " << typeid(IInterface).name();
+ return false;
+ }
+ Object = object;
+ return true;
+ }
+
+ TProto SerializeToProto() const {
+ TProto result;
+ if (!Object) {
+ return result;
+ }
+ result = Object->SerializeToProto();
+ TOperatorPolicy::SetClassName(result, Object->GetClassName());
+ return result;
+ }
+};
+
+}
diff --git a/ydb/services/bg_tasks/abstract/scheduler.cpp b/ydb/services/bg_tasks/abstract/scheduler.cpp
new file mode 100644
index 00000000000..8c30232e4f8
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/scheduler.cpp
@@ -0,0 +1,5 @@
+#include "scheduler.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+}
diff --git a/ydb/services/bg_tasks/abstract/scheduler.h b/ydb/services/bg_tasks/abstract/scheduler.h
new file mode 100644
index 00000000000..84ff7bab728
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/scheduler.h
@@ -0,0 +1,54 @@
+#pragma once
+#include "interface.h"
+#include "state.h"
+
+#include <ydb/core/base/events.h>
+
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/object_factory/object_factory.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class ITaskScheduler: public IStringSerializable {
+public:
+ using TPtr = std::shared_ptr<ITaskScheduler>;
+ using TFactory = NObjectFactory::TObjectFactory<ITaskScheduler, TString>;
+protected:
+ virtual std::optional<TInstant> DoGetNextStartInstant(const TInstant currentStartInstant, const TTaskStateContainer& state) const = 0;
+ virtual TInstant DoGetStartInstant() const = 0;
+public:
+ virtual TString GetClassName() const = 0;
+ std::optional<TInstant> GetNextStartInstant(const TInstant currentStartInstant, const TTaskStateContainer& state) const {
+ return DoGetNextStartInstant(currentStartInstant, state);
+ }
+ TInstant GetStartInstant() const {
+ return DoGetStartInstant();
+ }
+};
+
+class TTaskSchedulerContainer: public TInterfaceStringContainer<ITaskScheduler> {
+private:
+ using TBase = TInterfaceStringContainer<ITaskScheduler>;
+public:
+ using TBase::TBase;
+
+ std::optional<TInstant> GetNextStartInstant(const TInstant currentStartInstant, const TTaskStateContainer& state) const {
+ if (!Object) {
+ return TInstant::Zero();
+ }
+ return Object->GetNextStartInstant(currentStartInstant, state);
+ }
+
+ TInstant GetStartInstant() const {
+ if (!Object) {
+ return TInstant::Zero();
+ }
+ return Object->GetStartInstant();
+ }
+};
+
+class IJsonTaskScheduler: public IJsonStringSerializable<ITaskScheduler> {
+
+};
+
+}
diff --git a/ydb/services/bg_tasks/abstract/state.cpp b/ydb/services/bg_tasks/abstract/state.cpp
new file mode 100644
index 00000000000..5cd85884467
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/state.cpp
@@ -0,0 +1,5 @@
+#include "state.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+}
diff --git a/ydb/services/bg_tasks/abstract/state.h b/ydb/services/bg_tasks/abstract/state.h
new file mode 100644
index 00000000000..e4062dd6ecd
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/state.h
@@ -0,0 +1,25 @@
+#pragma once
+#include "interface.h"
+
+#include <ydb/core/base/events.h>
+
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/object_factory/object_factory.h>
+
+namespace NKikimr::NBackgroundTasks {
+class ITaskState: public IStringSerializable {
+public:
+ using TPtr = std::shared_ptr<ITaskState>;
+ using TFactory = NObjectFactory::TObjectFactory<ITaskState, TString>;
+public:
+ virtual TString GetClassName() const = 0;
+};
+
+class TTaskStateContainer: public TInterfaceStringContainer<ITaskState> {
+private:
+ using TBase = TInterfaceStringContainer<ITaskState>;
+public:
+ using TBase::TBase;
+};
+
+}
diff --git a/ydb/services/bg_tasks/abstract/task.cpp b/ydb/services/bg_tasks/abstract/task.cpp
new file mode 100644
index 00000000000..2fbe51fab03
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/task.cpp
@@ -0,0 +1,5 @@
+#include "task.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+}
diff --git a/ydb/services/bg_tasks/abstract/task.h b/ydb/services/bg_tasks/abstract/task.h
new file mode 100644
index 00000000000..eb87ea5e1b9
--- /dev/null
+++ b/ydb/services/bg_tasks/abstract/task.h
@@ -0,0 +1,149 @@
+#pragma once
+#include "activity.h"
+#include "scheduler.h"
+#include "state.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/events.h>
+#include <ydb/core/tx/tiering/decoder.h>
+#include <ydb/library/accessor/accessor.h>
+
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/object_factory/object_factory.h>
+#include <util/generic/guid.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TTask;
+
+class TTask {
+private:
+ YDB_ACCESSOR(TString, Id, TGUID::CreateTimebased().AsGuidString());
+ YDB_READONLY_DEF(TString, Class);
+ YDB_READONLY_DEF(TString, ExecutorId);
+ YDB_READONLY(TInstant, LastPing, TInstant::Zero());
+ YDB_READONLY(TInstant, StartInstant, TInstant::Zero());
+ YDB_READONLY(TInstant, ConstructInstant, TAppData::TimeProvider->Now());
+ YDB_READONLY_DEF(TTaskActivityContainer, Activity);
+ YDB_READONLY_DEF(TTaskSchedulerContainer, Scheduler);
+ YDB_ACCESSOR_DEF(TTaskStateContainer, State);
+ YDB_FLAG_ACCESSOR(Enabled, true);
+public:
+ TTask() = default;
+ TTask(ITaskActivity::TPtr activity, ITaskScheduler::TPtr scheduler)
+ : Activity(activity)
+ , Scheduler(scheduler)
+ {
+
+ }
+
+ void Execute(ITaskExecutorController::TPtr controller) const {
+ if (!Activity) {
+ controller->TaskFinished();
+ } else {
+ Activity.GetObjectPtr()->Execute(controller, State);
+ }
+ }
+
+ void Finished() const {
+ if (!!Activity) {
+ Activity.GetObjectPtr()->Finished(State);
+ }
+ }
+
+ class TDecoder: public NInternal::TDecoderBase {
+ private:
+ YDB_ACCESSOR(i32, IdIdx, -1);
+ YDB_ACCESSOR(i32, ClassIdx, -1);
+ YDB_ACCESSOR(i32, ExecutorIdIdx, -1);
+ YDB_ACCESSOR(i32, LastPingIdx, -1);
+ YDB_ACCESSOR(i32, StartInstantIdx, -1);
+ YDB_ACCESSOR(i32, ConstructInstantIdx, -1);
+ YDB_ACCESSOR(i32, ActivityIdx, -1);
+ YDB_ACCESSOR(i32, SchedulerIdx, -1);
+ YDB_ACCESSOR(i32, StateIdx, -1);
+ YDB_ACCESSOR(i32, EnabledIdx, -1);
+ public:
+ static inline const TString Id = "id";
+ static inline const TString Class = "class";
+ static inline const TString ExecutorId = "executorId";
+ static inline const TString LastPing = "lastPing";
+ static inline const TString StartInstant = "startInstant";
+ static inline const TString ConstructInstant = "constructInstant";
+ static inline const TString Activity = "activity";
+ static inline const TString Scheduler = "scheduler";
+ static inline const TString State = "state";
+ static inline const TString Enabled = "enabled";
+
+ TDecoder(const Ydb::ResultSet& rawData) {
+ IdIdx = GetFieldIndex(rawData, Id);
+ ClassIdx = GetFieldIndex(rawData, Class);
+ ExecutorIdIdx = GetFieldIndex(rawData, ExecutorId);
+ LastPingIdx = GetFieldIndex(rawData, LastPing);
+ StartInstantIdx = GetFieldIndex(rawData, StartInstant);
+ ConstructInstantIdx = GetFieldIndex(rawData, ConstructInstant);
+ ActivityIdx = GetFieldIndex(rawData, Activity);
+ SchedulerIdx = GetFieldIndex(rawData, Scheduler);
+ StateIdx = GetFieldIndex(rawData, State);
+ EnabledIdx = GetFieldIndex(rawData, Enabled);
+ }
+ };
+
+ bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue) {
+ if (!decoder.Read(decoder.GetIdIdx(), Id, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetClassIdx(), Class, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetEnabledIdx(), EnabledFlag, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetExecutorIdIdx(), ExecutorId, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetLastPingIdx(), LastPing, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetStartInstantIdx(), StartInstant, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetConstructInstantIdx(), ConstructInstant, rawValue)) {
+ return false;
+ }
+ {
+ TString activityData;
+ if (!decoder.Read(decoder.GetActivityIdx(), activityData, rawValue)) {
+ return false;
+ }
+ if (!Activity.DeserializeFromString(activityData)) {
+ return false;
+ }
+ if (!Activity) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse task activity";
+ return false;
+ }
+ }
+ {
+ TString schedulerData;
+ if (!decoder.Read(decoder.GetSchedulerIdx(), schedulerData, rawValue)) {
+ return false;
+ }
+ if (!Scheduler.DeserializeFromString(schedulerData)) {
+ return false;
+ }
+ }
+ {
+ TString stateData;
+ if (!decoder.Read(decoder.GetStateIdx(), stateData, rawValue)) {
+ return false;
+ }
+ if (!State.DeserializeFromString(stateData)) {
+ return false;
+ }
+ }
+ return true;
+ }
+};
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/CMakeLists.txt b/ydb/services/bg_tasks/ds_table/CMakeLists.txt
new file mode 100644
index 00000000000..5fecb1aa1bc
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/CMakeLists.txt
@@ -0,0 +1,35 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-bg_tasks-ds_table)
+target_link_libraries(services-bg_tasks-ds_table PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-library-accessor
+ cpp-actors-core
+ api-protos
+ services-bg_tasks-abstract
+ services-metadata-initializer
+ ydb-core-base
+ services-metadata-request
+)
+target_sources(services-bg_tasks-ds_table PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/interrupt.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/executor_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/finish_task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/assign_tasks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/add_tasks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_enabled.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/lock_pinger.cpp
+)
diff --git a/ydb/services/bg_tasks/ds_table/add_tasks.cpp b/ydb/services/bg_tasks/ds_table/add_tasks.cpp
new file mode 100644
index 00000000000..9482d0087ef
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/add_tasks.cpp
@@ -0,0 +1,79 @@
+#include "add_tasks.h"
+
+#include <ydb/services/bg_tasks/service.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TAddTasksActor::OnSessionId(const TString& sessionId) {
+ Ydb::Table::ExecuteDataQueryRequest request;
+ TStringBuilder sb;
+ sb << "DECLARE $activityString AS String;" << Endl;
+ sb << "DECLARE $taskId AS String;" << Endl;
+ sb << "DECLARE $schedulerString AS String;" << Endl;
+ sb << "DECLARE $enabled AS Bool;" << Endl;
+ sb << "DECLARE $className AS String;" << Endl;
+ sb << "DECLARE $startInstant AS Uint32;" << Endl;
+ sb << "DECLARE $constructInstant AS Uint32;" << Endl;
+ sb << "UPSERT INTO `" + Controller->GetTableName() + "` (id, enabled, class, startInstant, constructInstant, activity, scheduler)" << Endl;
+ sb << "VALUES" << Endl;
+ {
+ sb << "(" << Endl;
+ sb << "$taskId," << Endl;
+ sb << "$enabled," << Endl;
+ sb << "$className," << Endl;
+ sb << "$startInstant," << Endl;
+ sb << "$constructInstant," << Endl;
+ sb << "$activityString," << Endl;
+ sb << "$schedulerString" << Endl;
+ sb << ")" << Endl;
+ }
+ request.mutable_query()->set_yql_text(sb);
+
+ {
+ auto& param = (*request.mutable_parameters())["$enabled"];
+ param.mutable_value()->set_bool_value(Task.IsEnabled());
+ param.mutable_type()->set_type_id(Ydb::Type::BOOL);
+ }
+
+ {
+ auto& param = (*request.mutable_parameters())["$constructInstant"];
+ param.mutable_value()->set_uint32_value(Task.GetConstructInstant().Seconds());
+ param.mutable_type()->set_type_id(Ydb::Type::UINT32);
+ }
+
+ {
+ auto& param = (*request.mutable_parameters())["$startInstant"];
+ param.mutable_value()->set_uint32_value(Task.GetScheduler().GetStartInstant().Seconds());
+ param.mutable_type()->set_type_id(Ydb::Type::UINT32);
+ }
+
+ {
+ auto& param = (*request.mutable_parameters())["$className"];
+ param.mutable_value()->set_bytes_value(Task.GetClass());
+ param.mutable_type()->set_type_id(Ydb::Type::STRING);
+ }
+
+ auto& idString = (*request.mutable_parameters())["$taskId"];
+ idString.mutable_value()->set_bytes_value(Task.GetId());
+ idString.mutable_type()->set_type_id(Ydb::Type::STRING);
+
+ auto& aString = (*request.mutable_parameters())["$activityString"];
+ aString.mutable_value()->set_bytes_value(Task.GetActivity().SerializeToString());
+ aString.mutable_type()->set_type_id(Ydb::Type::STRING);
+
+ auto& sString = (*request.mutable_parameters())["$schedulerString"];
+ sString.mutable_value()->set_bytes_value(Task.GetScheduler().SerializeToString());
+ sString.mutable_type()->set_type_id(Ydb::Type::STRING);
+
+ request.set_session_id(sessionId);
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.mutable_tx_control()->set_commit_tx(true);
+
+ return request;
+}
+
+void TAddTasksActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*ev*/) {
+ Sender<TEvAddTaskResult>(Task.GetId(), true).SendTo(ResultWaiter);
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/add_tasks.h b/ydb/services/bg_tasks/ds_table/add_tasks.h
new file mode 100644
index 00000000000..ef1e3bdac92
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/add_tasks.h
@@ -0,0 +1,27 @@
+#pragma once
+#include "executor_controller.h"
+#include <ydb/services/bg_tasks/abstract/task.h>
+#include <ydb/services/metadata/request/request_actor.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TAddTasksActor: public NInternal::NRequest::TSessionedActor {
+private:
+ using TBase = NInternal::NRequest::TSessionedActor;
+ TExecutorController::TPtr Controller;
+ const TTask Task;
+ const TActorId ResultWaiter;
+protected:
+ virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& ev) override;
+ virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override;
+
+public:
+ TAddTasksActor(TExecutorController::TPtr controller, const TTask& task, const TActorId resultWaiter)
+ : TBase(controller->GetRequestConfig())
+ , Controller(controller)
+ , Task(task)
+ , ResultWaiter(resultWaiter)
+ {
+ }
+};
+}
diff --git a/ydb/services/bg_tasks/ds_table/assign_tasks.cpp b/ydb/services/bg_tasks/ds_table/assign_tasks.cpp
new file mode 100644
index 00000000000..03a43a0695a
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/assign_tasks.cpp
@@ -0,0 +1,45 @@
+#include "assign_tasks.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TAssignTasksActor::OnSessionId(const TString& sessionId) {
+ Ydb::Table::ExecuteDataQueryRequest request;
+ TStringBuilder sb;
+ const auto now = TActivationContext::Now();
+ sb << "DECLARE $executorId AS String;" << Endl;
+ sb << "DECLARE $lastPingCriticalBorder AS Uint32;" << Endl;
+ sb << "DECLARE $lastPingNewValue AS Uint32;" << Endl;
+ sb << "$ids = (SELECT id FROM `" << Controller->GetTableName() << "`"
+ << " WHERE (lastPing < $lastPingCriticalBorder"
+ << " OR executorId IS NULL) AND enabled = true"
+ << " LIMIT " << TasksCount << ");" << Endl;
+ sb << "UPSERT INTO `" + Controller->GetTableName() + "`"
+ << " SELECT id, $executorId as executorId, $lastPingNewValue as lastPing"
+ << " FROM $ids";
+ {
+ auto& param = (*request.mutable_parameters())["$lastPingCriticalBorder"];
+ param.mutable_value()->set_uint32_value((now - Controller->GetConfig().GetPingCheckPeriod()).Seconds());
+ param.mutable_type()->set_type_id(Ydb::Type::UINT32);
+ }
+ {
+ auto& param = (*request.mutable_parameters())["$lastPingNewValue"];
+ param.mutable_value()->set_uint32_value(now.Seconds());
+ param.mutable_type()->set_type_id(Ydb::Type::UINT32);
+ }
+ {
+ auto& param = (*request.mutable_parameters())["$executorId"];
+ param.mutable_value()->set_bytes_value(ExecutorId);
+ param.mutable_type()->set_type_id(Ydb::Type::STRING);
+ }
+ request.mutable_query()->set_yql_text(sb);
+ request.set_session_id(sessionId);
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.mutable_tx_control()->set_commit_tx(true);
+ return request;
+}
+
+void TAssignTasksActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*result*/) {
+ Controller->AssignFinished();
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/assign_tasks.h b/ydb/services/bg_tasks/ds_table/assign_tasks.h
new file mode 100644
index 00000000000..640fa39aaee
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/assign_tasks.h
@@ -0,0 +1,27 @@
+#pragma once
+#include "executor_controller.h"
+
+#include <ydb/services/metadata/request/request_actor.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TAssignTasksActor: public NInternal::NRequest::TSessionedActor {
+private:
+ using TBase = NInternal::NRequest::TSessionedActor;
+ TExecutorController::TPtr Controller;
+ const ui32 TasksCount;
+ const TString ExecutorId;
+
+ virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override;
+ virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override;
+public:
+ TAssignTasksActor(const ui32 tasksCount, TExecutorController::TPtr controller, const TString& executorId)
+ : TBase(controller->GetRequestConfig())
+ , Controller(controller)
+ , TasksCount(tasksCount)
+ , ExecutorId(executorId)
+ {
+
+ }
+};
+}
diff --git a/ydb/services/bg_tasks/ds_table/config.cpp b/ydb/services/bg_tasks/ds_table/config.cpp
new file mode 100644
index 00000000000..0549e811aa5
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/config.cpp
@@ -0,0 +1,23 @@
+#include "config.h"
+#include <ydb/core/base/appdata.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+bool TConfig::DeserializeFromProto(const NKikimrConfig::TBackgroundTasksConfig& config) {
+ EnabledFlag = config.GetEnabled();
+ if (!RequestConfig.DeserializeFromProto(config.GetRequestConfig())) {
+ return false;
+ }
+ PullPeriod = TDuration::Seconds(config.GetPullPeriodSeconds());
+ PingPeriod = TDuration::Seconds(config.GetPingPeriodSeconds());
+ PingCheckPeriod = TDuration::Seconds(config.GetPingCheckPeriodSeconds());
+ MaxInFlight = config.GetMaxInFlight();
+ InternalTablePath = config.GetInternalTablePath();
+ return true;
+}
+
+TString TConfig::GetTablePath() const {
+ return AppData()->TenantName + "/" + InternalTablePath;
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/config.h b/ydb/services/bg_tasks/ds_table/config.h
new file mode 100644
index 00000000000..1d36f5bbc68
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/config.h
@@ -0,0 +1,22 @@
+#pragma once
+#include <ydb/core/protos/config.pb.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/services/metadata/request/config.h>
+
+#include <util/datetime/base.h>
+
+namespace NKikimr::NBackgroundTasks {
+class TConfig {
+private:
+ YDB_READONLY_DEF(NInternal::NRequest::TConfig, RequestConfig);
+ YDB_READONLY(TDuration, PullPeriod, TDuration::Seconds(10));
+ YDB_READONLY(TDuration, PingPeriod, TDuration::Seconds(2));
+ YDB_READONLY(TDuration, PingCheckPeriod, TDuration::Seconds(20));
+ YDB_READONLY(ui32, MaxInFlight, 1);
+ YDB_READONLY(TString, InternalTablePath, ".bg_tasks/tasks");
+ YDB_READONLY_FLAG(Enabled, false);
+public:
+ bool DeserializeFromProto(const NKikimrConfig::TBackgroundTasksConfig& config);
+ TString GetTablePath() const;
+};
+}
diff --git a/ydb/services/bg_tasks/ds_table/executor.cpp b/ydb/services/bg_tasks/ds_table/executor.cpp
new file mode 100644
index 00000000000..252e8c79521
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/executor.cpp
@@ -0,0 +1,143 @@
+#include "add_tasks.h"
+#include "assign_tasks.h"
+#include "executor.h"
+#include "lock_pinger.h"
+#include "task_executor.h"
+#include "task_enabled.h"
+#include "fetch_tasks.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+TVector<NKikimr::NMetadataProvider::ITableModifier::TPtr> TExecutor::BuildModifiers() const {
+ const TString tableName = Config.GetTablePath();
+ TVector<NMetadataProvider::ITableModifier::TPtr> result;
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(tableName);
+ request.add_primary_key("id");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("id");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("enabled");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::BOOL);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("class");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("executorId");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("lastPing");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("startInstant");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("constructInstant");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("activity");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("scheduler");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("state");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
+ }
+ return result;
+}
+
+void TExecutor::Handle(TEvStartAssign::TPtr& /*ev*/) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "start assign";
+ if (Config.GetMaxInFlight() > CurrentTaskIds.size()) {
+ Register(new TAssignTasksActor(Config.GetMaxInFlight() - CurrentTaskIds.size(), Controller, ExecutorId));
+ }
+}
+
+void TExecutor::Handle(TEvAssignFinished::TPtr& /*ev*/) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "assign finished";
+ Register(new TFetchTasksActor(CurrentTaskIds, ExecutorId, Controller));
+}
+
+void TExecutor::Handle(TEvFetchingFinished::TPtr& /*ev*/) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "assign scheduled: " << Config.GetPullPeriod();
+ Schedule(Config.GetPullPeriod(), new TEvStartAssign);
+}
+
+void TExecutor::Handle(TEvLockPingerFinished::TPtr& /*ev*/) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "pinger scheduled: " << Config.GetPingPeriod();
+ Schedule(Config.GetPingPeriod(), new TEvLockPingerStart);
+}
+
+void TExecutor::Handle(TEvLockPingerStart::TPtr& /*ev*/) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "pinger start";
+ if (CurrentTaskIds.size()) {
+ Register(new TLockPingerActor(Controller, CurrentTaskIds));
+ } else {
+ Schedule(Config.GetPingPeriod(), new TEvLockPingerStart);
+ }
+}
+
+void TExecutor::Handle(TEvTaskFetched::TPtr& ev) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "task fetched";
+ if (CurrentTaskIds.emplace(ev->Get()->GetTask().GetId()).second) {
+ Register(new TTaskExecutor(ev->Get()->GetTask(), Controller));
+ }
+}
+
+void TExecutor::Handle(TEvTaskExecutorFinished::TPtr& ev) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "task executor finished";
+ Y_VERIFY(CurrentTaskIds.contains(ev->Get()->GetTaskId()));
+ CurrentTaskIds.erase(ev->Get()->GetTaskId());
+ Sender<TEvStartAssign>().SendTo(SelfId());
+}
+
+void TExecutor::Handle(TEvAddTask::TPtr& ev) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "add task";
+ Register(new TAddTasksActor(Controller, ev->Get()->GetTask(), ev->Sender));
+}
+
+void TExecutor::Handle(TEvUpdateTaskEnabled::TPtr& ev) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "start task";
+ Register(new TUpdateTaskEnabledActor(Controller, ev->Get()->GetTaskId(), ev->Get()->GetEnabled(), ev->Sender));
+}
+
+void TExecutor::RegisterState() {
+ Controller = std::make_shared<TExecutorController>(SelfId(), Config);
+ Become(&TExecutor::StateMain);
+}
+
+void TExecutor::OnInitialized() {
+ Sender<TEvStartAssign>().SendTo(SelfId());
+ Schedule(Config.GetPingPeriod(), new TEvLockPingerStart);
+}
+
+NActors::IActor* CreateService(const TConfig& config) {
+ return new TExecutor(config);
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/executor.h b/ydb/services/bg_tasks/ds_table/executor.h
new file mode 100644
index 00000000000..1a07c093991
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/executor.h
@@ -0,0 +1,102 @@
+#pragma once
+#include "config.h"
+#include "executor_controller.h"
+
+#include <ydb/services/bg_tasks/abstract/common.h>
+#include <ydb/services/bg_tasks/abstract/task.h>
+#include <ydb/services/bg_tasks/service.h>
+#include <ydb/services/metadata/initializer/accessor_init.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TEvLockPingerFinished: public TEventLocal<TEvLockPingerFinished, EEvents::EvLockPingerFinished> {
+
+};
+
+class TEvLockPingerStart: public TEventLocal<TEvLockPingerStart, EEvents::EvLockPingerStart> {
+
+};
+
+class TEvStartAssign: public TEventLocal<TEvStartAssign, EEvents::EvStartAssign> {
+
+};
+
+class TEvAssignFinished: public TEventLocal<TEvAssignFinished, EEvents::EvAssignFinished> {
+
+};
+
+class TEvFetchingFinished: public TEventLocal<TEvFetchingFinished, EEvents::EvFetchingFinished> {
+
+};
+
+class TEvTaskFetched: public TEventLocal<TEvTaskFetched, EEvents::EvTaskFetched> {
+private:
+ YDB_READONLY_DEF(TTask, Task);
+public:
+ TEvTaskFetched(const TTask& task)
+ : Task(task)
+ {
+
+ }
+};
+
+class TEvTaskExecutorFinished: public TEventLocal<TEvTaskExecutorFinished, EEvents::EvTaskExecutorFinished> {
+private:
+ YDB_READONLY_DEF(TString, TaskId);
+public:
+ TEvTaskExecutorFinished(const TString& taskId)
+ : TaskId(taskId)
+ {
+
+ }
+};
+
+class TExecutor: public NMetadataProvider::TDSAccessorInitialized {
+private:
+ using TBase = NMetadataProvider::TDSAccessorInitialized;
+ TString TableName;
+ const TString ExecutorId = TGUID::CreateTimebased().AsUuidString();
+ const TConfig Config;
+ std::set<TString> CurrentTaskIds;
+ TExecutorController::TPtr Controller;
+protected:
+ virtual void RegisterState() override;
+ virtual void OnInitialized() override;
+ virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const override;
+
+ void Handle(TEvStartAssign::TPtr& ev);
+ void Handle(TEvAssignFinished::TPtr& ev);
+ void Handle(TEvFetchingFinished::TPtr& ev);
+ void Handle(TEvTaskFetched::TPtr& ev);
+ void Handle(TEvTaskExecutorFinished::TPtr& ev);
+ void Handle(TEvAddTask::TPtr& ev);
+ void Handle(TEvUpdateTaskEnabled::TPtr& ev);
+ void Handle(TEvLockPingerStart::TPtr& ev);
+ void Handle(TEvLockPingerFinished::TPtr& ev);
+
+ STFUNC(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvStartAssign, Handle);
+ hFunc(TEvAssignFinished, Handle);
+ hFunc(TEvFetchingFinished, Handle);
+ hFunc(TEvTaskFetched, Handle);
+ hFunc(TEvAddTask, Handle);
+ hFunc(TEvTaskExecutorFinished, Handle);
+ hFunc(TEvLockPingerStart, Handle);
+ hFunc(TEvLockPingerFinished, Handle);
+ default:
+ TBase::StateMain(ev, ctx);
+ }
+ }
+
+public:
+ TExecutor(const TConfig& config)
+ : TBase(config.GetRequestConfig())
+ , Config(config)
+ {
+ }
+};
+
+IActor* CreateService(const TConfig& config);
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.cpp b/ydb/services/bg_tasks/ds_table/executor_controller.cpp
new file mode 100644
index 00000000000..14f9a9ac691
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/executor_controller.cpp
@@ -0,0 +1,30 @@
+#include "executor.h"
+#include "executor_controller.h"
+
+#include <ydb/services/bg_tasks/abstract/task.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+void TExecutorController::AssignFinished() const {
+ ExecutorActorId.Send(ExecutorActorId, new TEvAssignFinished());
+}
+
+void TExecutorController::FetchingFinished() const {
+ ExecutorActorId.Send(ExecutorActorId, new TEvFetchingFinished());
+}
+
+void TExecutorController::TaskFetched(const TTask& task) const {
+ ExecutorActorId.Send(ExecutorActorId, new TEvTaskFetched(task));
+}
+
+void TExecutorController::TaskFinished(const TString& taskId) const {
+ ExecutorActorId.Send(ExecutorActorId, new TEvTaskExecutorFinished(taskId));
+}
+
+void TExecutorController::LockPingerFinished() const {
+ ExecutorActorId.Send(ExecutorActorId, new TEvLockPingerFinished());
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.h b/ydb/services/bg_tasks/ds_table/executor_controller.h
new file mode 100644
index 00000000000..3d3aebb9cd3
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/executor_controller.h
@@ -0,0 +1,39 @@
+#pragma once
+#include "config.h"
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actorid.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TTask;
+
+class TExecutorController {
+private:
+ const NActors::TActorIdentity ExecutorActorId;
+ YDB_READONLY_DEF(TConfig, Config);
+public:
+ using TPtr = std::shared_ptr<TExecutorController>;
+ TExecutorController(const NActors::TActorIdentity& executorActorId, const TConfig& config)
+ : ExecutorActorId(executorActorId)
+ , Config(config)
+ {
+
+ }
+
+ TString GetTableName() const {
+ return Config.GetTablePath();
+ }
+
+ const NInternal::NRequest::TConfig& GetRequestConfig() const {
+ return Config.GetRequestConfig();
+ }
+
+ void LockPingerFinished() const;
+ void TaskFetched(const TTask& task) const;
+ void TaskFinished(const TString& taskId) const;
+ void AssignFinished() const;
+ void FetchingFinished() const;
+};
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp b/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp
new file mode 100644
index 00000000000..875ab7aaff1
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp
@@ -0,0 +1,63 @@
+#include "fetch_tasks.h"
+
+#include <ydb/services/bg_tasks/abstract/task.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+void TFetchTasksActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& currentFullReply) {
+ Ydb::Table::ExecuteQueryResult qResult;
+ currentFullReply.operation().result().UnpackTo(&qResult);
+ Y_VERIFY((size_t)qResult.result_sets().size() == 1);
+ TTask::TDecoder decoder(qResult.result_sets()[0]);
+ std::vector<TTask> newTasks;
+ for (auto&& i : qResult.result_sets()[0].rows()) {
+ TTask task;
+ if (!task.DeserializeFromRecord(decoder, i)) {
+ ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse task record";
+ continue;
+ }
+ Controller->TaskFetched(task);
+ }
+ Controller->FetchingFinished();
+}
+
+std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TFetchTasksActor::OnSessionId(const TString& sessionId) {
+ Ydb::Table::ExecuteDataQueryRequest request;
+ TStringBuilder sb;
+ sb << "DECLARE $executorId AS String;" << Endl;
+ sb << "DECLARE $lastPingCriticalBorder AS Uint32;" << Endl;
+ if (CurrentTaskIds.size()) {
+ sb << "DECLARE $taskIds AS List<String>;" << Endl;
+ }
+ sb << "SELECT * FROM `" + Controller->GetTableName() + "`" << Endl;
+ sb << "WHERE executorId = $executorId" << Endl;
+ sb << "AND enabled = true" << Endl;
+ sb << "AND lastPing > $lastPingCriticalBorder" << Endl;
+ if (CurrentTaskIds.size()) {
+ sb << " AND id NOT IN ($taskIds)" << Endl;
+ auto& idStrings = (*request.mutable_parameters())["$taskIds"];
+ idStrings.mutable_type()->mutable_list_type();
+ for (auto&& i : CurrentTaskIds) {
+ auto* idString = idStrings.mutable_value()->add_items();
+ idString->set_bytes_value(i);
+ }
+ }
+
+ {
+ auto& param = (*request.mutable_parameters())["$executorId"];
+ param.mutable_value()->set_bytes_value(ExecutorId);
+ param.mutable_type()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& param = (*request.mutable_parameters())["$lastPingCriticalBorder"];
+ param.mutable_value()->set_uint32_value((TActivationContext::Now() - Controller->GetConfig().GetPingCheckPeriod()).Seconds());
+ param.mutable_type()->set_type_id(Ydb::Type::UINT32);
+ }
+ request.mutable_query()->set_yql_text(sb);
+ request.set_session_id(sessionId);
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only();
+
+ return request;
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/fetch_tasks.h b/ydb/services/bg_tasks/ds_table/fetch_tasks.h
new file mode 100644
index 00000000000..0eea7a2646a
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/fetch_tasks.h
@@ -0,0 +1,29 @@
+#pragma once
+#include "executor_controller.h"
+
+#include <ydb/services/metadata/request/request_actor.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TFetchTasksActor: public NInternal::NRequest::TSessionedActor {
+private:
+ using TBase = NInternal::NRequest::TSessionedActor;
+ const std::set<TString> CurrentTaskIds;
+ const TString ExecutorId;
+ TExecutorController::TPtr Controller;
+protected:
+ virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override;
+ virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override;
+
+public:
+ TFetchTasksActor(const std::set<TString>& currentTaskIds, const TString& executorId,
+ TExecutorController::TPtr controller)
+ : TBase(controller->GetRequestConfig())
+ , CurrentTaskIds(currentTaskIds)
+ , ExecutorId(executorId)
+ , Controller(controller)
+ {
+
+ }
+};
+}
diff --git a/ydb/services/bg_tasks/ds_table/finish_task.cpp b/ydb/services/bg_tasks/ds_table/finish_task.cpp
new file mode 100644
index 00000000000..874fa90d373
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/finish_task.cpp
@@ -0,0 +1,27 @@
+#include "finish_task.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TDropTaskActor::OnSessionId(const TString& sessionId) {
+ Ydb::Table::ExecuteDataQueryRequest request;
+ TStringBuilder sb;
+ sb << "DECLARE $taskId AS String;" << Endl;
+ sb << "DELETE FROM `" + Controller->GetTableName() + "` ON SELECT $taskId AS id";
+ request.mutable_query()->set_yql_text(sb);
+
+ auto& idString = (*request.mutable_parameters())["$taskId"];
+ idString.mutable_value()->set_bytes_value(TaskId);
+ idString.mutable_type()->set_type_id(Ydb::Type::STRING);
+
+ request.set_session_id(sessionId);
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.mutable_tx_control()->set_commit_tx(true);
+
+ return request;
+}
+
+void TDropTaskActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*result*/) {
+ Controller->TaskFinished(TaskId);
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/finish_task.h b/ydb/services/bg_tasks/ds_table/finish_task.h
new file mode 100644
index 00000000000..2b5020d7774
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/finish_task.h
@@ -0,0 +1,26 @@
+#pragma once
+#include "executor_controller.h"
+
+#include <ydb/services/metadata/request/request_actor.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TDropTaskActor: public NInternal::NRequest::TSessionedActor {
+private:
+ using TBase = NInternal::NRequest::TSessionedActor;
+ const TString TaskId;
+ TExecutorController::TPtr Controller;
+protected:
+ virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override;
+ virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override;
+
+public:
+
+ TDropTaskActor(const TString& taskId, TExecutorController::TPtr controller)
+ : TBase(controller->GetRequestConfig())
+ , TaskId(taskId)
+ , Controller(controller) {
+
+ }
+};
+}
diff --git a/ydb/services/bg_tasks/ds_table/interrupt.cpp b/ydb/services/bg_tasks/ds_table/interrupt.cpp
new file mode 100644
index 00000000000..909c9b2e56c
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/interrupt.cpp
@@ -0,0 +1,44 @@
+#include "interrupt.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TInterruptTaskActor::OnSessionId(const TString& sessionId) {
+ Ydb::Table::ExecuteDataQueryRequest request;
+ TStringBuilder sb;
+ sb << "DECLARE $taskId AS String;" << Endl;
+ sb << "DECLARE $stateString AS String;" << Endl;
+ sb << "DECLARE $startInstant AS Uint32;" << Endl;
+ sb << "UPDATE `" + ExecutorController->GetTableName() + "`" << Endl;
+ sb << "SET lastPing = 0" << Endl;
+ sb << ", executorId = null" << Endl;
+ sb << ", startInstant = $startInstant" << Endl;
+ sb << ", state = $stateString" << Endl;
+ sb << "WHERE id = $taskId" << Endl;
+ request.mutable_query()->set_yql_text(sb);
+
+ {
+ auto& param = (*request.mutable_parameters())["$startInstant"];
+ param.mutable_value()->set_uint32_value(NextStartInstant.Seconds());
+ param.mutable_type()->set_type_id(Ydb::Type::UINT32);
+ }
+
+ auto& idString = (*request.mutable_parameters())["$taskId"];
+ idString.mutable_value()->set_bytes_value(TaskId);
+ idString.mutable_type()->set_type_id(Ydb::Type::STRING);
+
+ auto& sString = (*request.mutable_parameters())["$stateString"];
+ sString.mutable_value()->set_bytes_value(State.SerializeToString());
+ sString.mutable_type()->set_type_id(Ydb::Type::STRING);
+
+ request.set_session_id(sessionId);
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.mutable_tx_control()->set_commit_tx(true);
+
+ return request;
+}
+
+void TInterruptTaskActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*result*/) {
+ ExecutorController->TaskFinished(TaskId);
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/interrupt.h b/ydb/services/bg_tasks/ds_table/interrupt.h
new file mode 100644
index 00000000000..a48008f00ae
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/interrupt.h
@@ -0,0 +1,40 @@
+#pragma once
+#include "executor_controller.h"
+
+#include <ydb/services/bg_tasks/abstract/state.h>
+#include <ydb/services/metadata/request/request_actor.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TInterruptTaskActor: public NInternal::NRequest::TSessionedActor {
+private:
+ using TBase = NInternal::NRequest::TSessionedActor;
+ TExecutorController::TPtr ExecutorController;
+ const TString TaskId;
+ const TInstant NextStartInstant;
+ TTaskStateContainer State;
+protected:
+ virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override;
+ virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override;
+
+public:
+ STFUNC(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogYQLRequest>, Handle);
+ default:
+ TBase::StateMain(ev, ctx);
+ }
+ }
+
+ TInterruptTaskActor(TExecutorController::TPtr executorController,
+ const TString& taskId, const TInstant nextStartInstant, TTaskStateContainer state)
+ : TBase(executorController->GetRequestConfig())
+ , ExecutorController(executorController)
+ , TaskId(taskId)
+ , NextStartInstant(nextStartInstant)
+ , State(state)
+ {
+
+ }
+};
+}
diff --git a/ydb/services/bg_tasks/ds_table/lock_pinger.cpp b/ydb/services/bg_tasks/ds_table/lock_pinger.cpp
new file mode 100644
index 00000000000..17b0be008a8
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/lock_pinger.cpp
@@ -0,0 +1,42 @@
+#include "lock_pinger.h"
+
+#include <ydb/services/bg_tasks/service.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TLockPingerActor::OnSessionId(const TString& sessionId) {
+ Ydb::Table::ExecuteDataQueryRequest request;
+ TStringBuilder sb;
+ const auto now = TActivationContext::Now();
+ sb << "DECLARE $taskIds AS List<String>;" << Endl;
+ sb << "DECLARE $lastPingNewValue AS Uint32;" << Endl;
+ sb << "UPDATE `" + ExecutorController->GetTableName() + "`" << Endl;
+ sb << "SET lastPing = $lastPingNewValue" << Endl;
+ sb << "WHERE id IN ($taskIds)" << Endl;
+ request.mutable_query()->set_yql_text(sb);
+
+ {
+ auto& param = (*request.mutable_parameters())["$lastPingNewValue"];
+ param.mutable_value()->set_uint32_value(now.Seconds());
+ param.mutable_type()->set_type_id(Ydb::Type::UINT32);
+ }
+
+ auto& idStrings = (*request.mutable_parameters())["$taskIds"];
+ idStrings.mutable_type()->mutable_list_type();
+ for (auto&& i : TaskIds) {
+ auto* idString = idStrings.mutable_value()->add_items();
+ idString->set_bytes_value(i);
+ }
+
+ request.set_session_id(sessionId);
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.mutable_tx_control()->set_commit_tx(true);
+
+ return request;
+}
+
+void TLockPingerActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*ev*/) {
+ ExecutorController->LockPingerFinished();
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/lock_pinger.h b/ydb/services/bg_tasks/ds_table/lock_pinger.h
new file mode 100644
index 00000000000..8c47f7b3f70
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/lock_pinger.h
@@ -0,0 +1,25 @@
+#pragma once
+#include "executor_controller.h"
+
+#include <ydb/services/bg_tasks/abstract/state.h>
+#include <ydb/services/metadata/request/request_actor.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TLockPingerActor: public NInternal::NRequest::TSessionedActor {
+private:
+ using TBase = NInternal::NRequest::TSessionedActor;
+ const std::set<TString> TaskIds;
+ TExecutorController::TPtr ExecutorController;
+protected:
+ virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override;
+ virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override;
+public:
+ TLockPingerActor(TExecutorController::TPtr executorController, const std::set<TString>& taskIds)
+ : TBase(executorController->GetRequestConfig())
+ , TaskIds(taskIds)
+ , ExecutorController(executorController) {
+ Y_VERIFY(TaskIds.size());
+ }
+};
+}
diff --git a/ydb/services/bg_tasks/ds_table/task_enabled.cpp b/ydb/services/bg_tasks/ds_table/task_enabled.cpp
new file mode 100644
index 00000000000..f56a983eaf0
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/task_enabled.cpp
@@ -0,0 +1,31 @@
+#include "task_enabled.h"
+
+#include <ydb/services/bg_tasks/service.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TUpdateTaskEnabledActor::OnSessionId(const TString& sessionId) {
+ Ydb::Table::ExecuteDataQueryRequest request;
+ TStringBuilder sb;
+ sb << "DECLARE $taskId AS String;" << Endl;
+ sb << "UPDATE `" + ExecutorController->GetTableName() + "`" << Endl;
+ sb << "SET enabled = " << Enabled << Endl;
+ sb << "WHERE id = $taskId" << Endl;
+ request.mutable_query()->set_yql_text(sb);
+
+ auto& idString = (*request.mutable_parameters())["$taskId"];
+ idString.mutable_value()->set_bytes_value(TaskId);
+ idString.mutable_type()->set_type_id(Ydb::Type::STRING);
+
+ request.set_session_id(sessionId);
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.mutable_tx_control()->set_commit_tx(true);
+
+ return request;
+}
+
+void TUpdateTaskEnabledActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*result*/) {
+ Sender<TEvUpdateTaskEnabledResult>(TaskId, true).SendTo(ResultWaiter);
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/task_enabled.h b/ydb/services/bg_tasks/ds_table/task_enabled.h
new file mode 100644
index 00000000000..776d561079e
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/task_enabled.h
@@ -0,0 +1,32 @@
+#pragma once
+#include "executor_controller.h"
+
+#include <ydb/services/bg_tasks/abstract/state.h>
+#include <ydb/services/metadata/request/request_actor.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TUpdateTaskEnabledActor: public NInternal::NRequest::TSessionedActor {
+private:
+ using TBase = NInternal::NRequest::TSessionedActor;
+ TExecutorController::TPtr ExecutorController;
+ const TString TaskId;
+ const bool Enabled = false;
+ const TActorId ResultWaiter;
+protected:
+ virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override;
+ virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override;
+
+public:
+ TUpdateTaskEnabledActor(TExecutorController::TPtr executorController,
+ const TString& taskId, const bool enabled, const TActorId& resultWaiter)
+ : TBase(executorController->GetRequestConfig())
+ , ExecutorController(executorController)
+ , TaskId(taskId)
+ , Enabled(enabled)
+ , ResultWaiter(resultWaiter)
+ {
+
+ }
+};
+}
diff --git a/ydb/services/bg_tasks/ds_table/task_executor.cpp b/ydb/services/bg_tasks/ds_table/task_executor.cpp
new file mode 100644
index 00000000000..219986f8a61
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/task_executor.cpp
@@ -0,0 +1,31 @@
+#include "task_executor.h"
+#include "interrupt.h"
+#include "finish_task.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+void TTaskExecutor::Handle(TEvTaskFinished::TPtr& /*ev*/) {
+ Register(new TDropTaskActor(Task.GetId(), OwnerController));
+ Task.Finished();
+ PassAway();
+}
+
+void TTaskExecutor::Handle(TEvTaskInterrupted::TPtr& ev) {
+ auto nextStartInstant = Task.GetScheduler().GetNextStartInstant(Task.GetStartInstant(), ev->Get()->GetTaskState());
+ if (nextStartInstant) {
+ Register(new TInterruptTaskActor(OwnerController, Task.GetId(),
+ *nextStartInstant, ev->Get()->GetTaskState()));
+ } else {
+ Register(new TDropTaskActor(Task.GetId(), OwnerController));
+ Task.Finished();
+ }
+ PassAway();
+}
+
+void TTaskExecutor::Bootstrap() {
+ Become(&TTaskExecutor::StateMain);
+ Controller = std::make_shared<TTaskExecutorController>(SelfId(), Task.GetId(), OwnerController->GetRequestConfig());
+ Task.Execute(Controller);
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/task_executor.h b/ydb/services/bg_tasks/ds_table/task_executor.h
new file mode 100644
index 00000000000..29f8afdcbc3
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/task_executor.h
@@ -0,0 +1,54 @@
+#pragma once
+#include "executor_controller.h"
+
+#include <ydb/services/bg_tasks/abstract/common.h>
+#include <ydb/services/bg_tasks/abstract/task.h>
+#include <ydb/services/bg_tasks/ds_table/task_executor_controller.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TEvTaskFinished: public TEventLocal<TEvTaskFinished, EEvents::EvTaskFinished> {
+};
+
+class TEvTaskInterrupted: public TEventLocal<TEvTaskInterrupted, EEvents::EvTaskInterrupted> {
+private:
+ YDB_READONLY_DEF(ITaskState::TPtr, TaskState);
+public:
+ TEvTaskInterrupted(ITaskState::TPtr taskState)
+ : TaskState(taskState)
+ {
+
+ }
+};
+
+class TTaskExecutor: public NActors::TActorBootstrapped<TTaskExecutor> {
+private:
+ TExecutorController::TPtr OwnerController;
+ TTaskExecutorController::TPtr Controller;
+ TTask Task;
+public:
+ TTaskExecutor(const TTask& task, TExecutorController::TPtr controller)
+ : OwnerController(controller)
+ , Task(task)
+ {
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTaskFinished, Handle);
+ hFunc(TEvTaskInterrupted, Handle);
+ default:
+ break;
+ }
+ }
+
+ void Bootstrap();
+
+ void Handle(TEvTaskFinished::TPtr& /*ev*/);
+
+ void Handle(TEvTaskInterrupted::TPtr& ev);
+};
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp b/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp
new file mode 100644
index 00000000000..68255ef5175
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp
@@ -0,0 +1,15 @@
+#include "task_executor.h"
+#include "task_executor_controller.h"
+#include <ydb/services/bg_tasks/abstract/task.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+void TTaskExecutorController::DoTaskInterrupted(ITaskState::TPtr actualTask) {
+ TaskExecutorId.Send(TaskExecutorId, new TEvTaskInterrupted(actualTask));
+}
+
+void TTaskExecutorController::DoTaskFinished() {
+ TaskExecutorId.Send(TaskExecutorId, new TEvTaskFinished());
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/task_executor_controller.h b/ydb/services/bg_tasks/ds_table/task_executor_controller.h
new file mode 100644
index 00000000000..2542d0d296e
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/task_executor_controller.h
@@ -0,0 +1,28 @@
+#pragma once
+#include <ydb/services/metadata/request/config.h>
+#include <ydb/services/bg_tasks/abstract/task.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TTaskExecutorController: public ITaskExecutorController {
+private:
+ const NActors::TActorIdentity TaskExecutorId;
+ YDB_READONLY_DEF(TString, TaskId);
+ YDB_READONLY_DEF(NInternal::NRequest::TConfig, RequestConfig);
+protected:
+ virtual void DoTaskInterrupted(ITaskState::TPtr actualTask) override;
+ virtual void DoTaskFinished() override;
+public:
+ TTaskExecutorController(const NActors::TActorIdentity& executorId,
+ const TString& taskId, const NInternal::NRequest::TConfig& requestConfig)
+ : TaskExecutorId(executorId)
+ , TaskId(taskId)
+ , RequestConfig(requestConfig) {
+
+ }
+
+};
+
+}
diff --git a/ydb/services/bg_tasks/protos/CMakeLists.txt b/ydb/services/bg_tasks/protos/CMakeLists.txt
new file mode 100644
index 00000000000..94a58a1da93
--- /dev/null
+++ b/ydb/services/bg_tasks/protos/CMakeLists.txt
@@ -0,0 +1,31 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-bg_tasks-protos)
+target_link_libraries(services-bg_tasks-protos PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+)
+target_proto_messages(services-bg_tasks-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/protos/container.proto
+)
+target_proto_addincls(services-bg_tasks-protos
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(services-bg_tasks-protos
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/ydb/services/bg_tasks/protos/container.proto b/ydb/services/bg_tasks/protos/container.proto
new file mode 100644
index 00000000000..c33822b77d4
--- /dev/null
+++ b/ydb/services/bg_tasks/protos/container.proto
@@ -0,0 +1,6 @@
+package NKikimrProto;
+
+message TStringContainer {
+ optional string ClassName = 1;
+ optional bytes BinaryData = 2;
+}
diff --git a/ydb/services/bg_tasks/service.cpp b/ydb/services/bg_tasks/service.cpp
new file mode 100644
index 00000000000..9c1b0dec5b0
--- /dev/null
+++ b/ydb/services/bg_tasks/service.cpp
@@ -0,0 +1,9 @@
+#include "service.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+NActors::TActorId MakeServiceId(const ui32 nodeId) {
+ return NActors::TActorId(nodeId, "SrvcBgrdTask");
+}
+
+}
diff --git a/ydb/services/bg_tasks/service.h b/ydb/services/bg_tasks/service.h
new file mode 100644
index 00000000000..0a022c60a56
--- /dev/null
+++ b/ydb/services/bg_tasks/service.h
@@ -0,0 +1,75 @@
+#pragma once
+
+#include <ydb/services/bg_tasks/abstract/common.h>
+#include <ydb/services/bg_tasks/abstract/task.h>
+
+#include <library/cpp/actors/core/event_local.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TEvAddTask: public NActors::TEventLocal<TEvAddTask, EEvents::EvAddTask> {
+private:
+ YDB_READONLY_DEF(TTask, Task);
+public:
+ TEvAddTask(TTask&& task)
+ : Task(std::move(task)) {
+ Y_VERIFY(!!Task.GetActivity());
+ }
+};
+
+template <class TDerived, const ui32 evType>
+class TEvCommonResult: public NActors::TEventLocal<TDerived, evType> {
+private:
+ YDB_READONLY_DEF(TString, TaskId);
+ YDB_READONLY_FLAG(Success, false);
+ YDB_ACCESSOR_DEF(TString, Report);
+public:
+ TEvCommonResult(const TString& taskId, const bool success, const TString& report = Default<TString>())
+ : TaskId(taskId)
+ , SuccessFlag(success)
+ , Report(report)
+ {
+ Y_VERIFY(!!TaskId);
+ }
+
+ TString GetDebugString() const {
+ TStringBuilder sb;
+ sb << "id=" << TaskId << ";";
+ sb << "success=" << SuccessFlag << ";";
+ if (!SuccessFlag) {
+ sb << "report=" << Report << ";";
+ }
+ return sb;
+ }
+};
+
+class TEvAddTaskResult: public TEvCommonResult<TEvAddTaskResult, EEvents::EvAddTaskResult> {
+private:
+ using TBase = TEvCommonResult<TEvAddTaskResult, EEvents::EvAddTaskResult>;
+public:
+ using TBase::TBase;
+};
+
+class TEvUpdateTaskEnabled: public NActors::TEventLocal<TEvUpdateTaskEnabled, EEvents::EvUpdateTaskEnabled> {
+private:
+ YDB_READONLY_DEF(TString, TaskId);
+ YDB_ACCESSOR(bool, Enabled, true);
+public:
+ TEvUpdateTaskEnabled(const TString& taskId, const bool enabled)
+ : TaskId(taskId)
+ , Enabled(enabled)
+ {
+ Y_VERIFY(!!TaskId);
+ }
+};
+
+class TEvUpdateTaskEnabledResult: public TEvCommonResult<TEvUpdateTaskEnabledResult, EEvents::EvUpdateTaskEnabledResult> {
+private:
+ using TBase = TEvCommonResult<TEvUpdateTaskEnabledResult, EEvents::EvUpdateTaskEnabledResult>;
+public:
+ using TBase::TBase;
+};
+
+NActors::TActorId MakeServiceId(const ui32 nodeId);
+
+}
diff --git a/ydb/services/bg_tasks/ut/CMakeLists.darwin.txt b/ydb/services/bg_tasks/ut/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..74df8ceb913
--- /dev/null
+++ b/ydb/services/bg_tasks/ut/CMakeLists.darwin.txt
@@ -0,0 +1,57 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-bg_tasks-ut)
+target_compile_options(ydb-services-bg_tasks-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-bg_tasks-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks
+)
+target_link_libraries(ydb-services-bg_tasks-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-bg_tasks
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ ydb-core-testlib
+ public-lib-yson_value
+ cpp-client-ydb_table
+ services-bg_tasks-abstract
+ cpp-testing-unittest
+ yql-parser-pg_wrapper
+ yql-sql-pg
+)
+target_link_options(ydb-services-bg_tasks-ut PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-services-bg_tasks-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/ut_tasks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/task_emulator.cpp
+)
+add_test(
+ NAME
+ ydb-services-bg_tasks-ut
+ COMMAND
+ ydb-services-bg_tasks-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-services-bg_tasks-ut)
diff --git a/ydb/services/bg_tasks/ut/CMakeLists.linux-aarch64.txt b/ydb/services/bg_tasks/ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..266c1f57e62
--- /dev/null
+++ b/ydb/services/bg_tasks/ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,59 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-bg_tasks-ut)
+target_compile_options(ydb-services-bg_tasks-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-bg_tasks-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks
+)
+target_link_libraries(ydb-services-bg_tasks-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ cpp-testing-unittest_main
+ ydb-services-bg_tasks
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ ydb-core-testlib
+ public-lib-yson_value
+ cpp-client-ydb_table
+ services-bg_tasks-abstract
+ cpp-testing-unittest
+ yql-parser-pg_wrapper
+ yql-sql-pg
+)
+target_link_options(ydb-services-bg_tasks-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-bg_tasks-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/ut_tasks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/task_emulator.cpp
+)
+add_test(
+ NAME
+ ydb-services-bg_tasks-ut
+ COMMAND
+ ydb-services-bg_tasks-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-services-bg_tasks-ut)
diff --git a/ydb/services/bg_tasks/ut/CMakeLists.linux.txt b/ydb/services/bg_tasks/ut/CMakeLists.linux.txt
new file mode 100644
index 00000000000..f3fe126c7d1
--- /dev/null
+++ b/ydb/services/bg_tasks/ut/CMakeLists.linux.txt
@@ -0,0 +1,61 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-bg_tasks-ut)
+target_compile_options(ydb-services-bg_tasks-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-bg_tasks-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks
+)
+target_link_libraries(ydb-services-bg_tasks-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-bg_tasks
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ ydb-core-testlib
+ public-lib-yson_value
+ cpp-client-ydb_table
+ services-bg_tasks-abstract
+ cpp-testing-unittest
+ yql-parser-pg_wrapper
+ yql-sql-pg
+)
+target_link_options(ydb-services-bg_tasks-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-bg_tasks-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/ut_tasks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/task_emulator.cpp
+)
+add_test(
+ NAME
+ ydb-services-bg_tasks-ut
+ COMMAND
+ ydb-services-bg_tasks-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-services-bg_tasks-ut)
diff --git a/ydb/services/bg_tasks/ut/CMakeLists.txt b/ydb/services/bg_tasks/ut/CMakeLists.txt
new file mode 100644
index 00000000000..3e0811fb22e
--- /dev/null
+++ b/ydb/services/bg_tasks/ut/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/services/bg_tasks/ut/task_emulator.cpp b/ydb/services/bg_tasks/ut/task_emulator.cpp
new file mode 100644
index 00000000000..ccf05e7f5a7
--- /dev/null
+++ b/ydb/services/bg_tasks/ut/task_emulator.cpp
@@ -0,0 +1,50 @@
+#include "task_emulator.h"
+
+namespace NKikimr {
+
+TTestInsertTaskActivity::TFactory::TRegistrator<TTestInsertTaskActivity> TTestInsertTaskActivity::Registrator(TTestInsertTaskState::GetClassNameStatic());
+TTestInsertTaskScheduler::TFactory::TRegistrator<TTestInsertTaskScheduler> TTestInsertTaskScheduler::Registrator(TTestInsertTaskState::GetClassNameStatic());
+TTestInsertTaskState::TFactory::TRegistrator<TTestInsertTaskState> TTestInsertTaskState::Registrator(TTestInsertTaskState::GetClassNameStatic());
+
+namespace {
+TMutex Mutex;
+std::map<TString, ui32> CounterSumByActivityId;
+std::map<TString, bool> FinishedByActivityId;
+}
+
+void TTestInsertTaskActivity::DoFinished(const NBackgroundTasks::TTaskStateContainer& /*state*/) {
+ TGuard<TMutex> g(Mutex);
+ FinishedByActivityId[ActivityTaskId] = true;
+}
+
+void TTestInsertTaskActivity::DoExecute(NBackgroundTasks::ITaskExecutorController::TPtr controller, const NBackgroundTasks::TTaskStateContainer& currentState) {
+ TGuard<TMutex> g(Mutex);
+ const ui32 c = currentState.HasObject() ?
+ currentState.GetAsSafe<TTestInsertTaskState>().GetCounter() :
+ 0;
+ Cerr << "TASK EXECUTED: " << c << Endl;
+ CounterSumByActivityId[ActivityTaskId] += c;
+ controller->TaskInterrupted(std::make_shared<TTestInsertTaskState>(c + 1));
+}
+
+bool TTestInsertTaskActivity::IsFinished(const TString& id) {
+ TGuard<TMutex> g(Mutex);
+ auto it = FinishedByActivityId.find(id);
+ if (it == FinishedByActivityId.end()) {
+ return false;
+ } else {
+ return it->second;
+ }
+}
+
+ui32 TTestInsertTaskActivity::GetCounterSum(const TString& id) {
+ TGuard<TMutex> g(Mutex);
+ auto it = CounterSumByActivityId.find(id);
+ if (it == CounterSumByActivityId.end()) {
+ return 0;
+ } else {
+ return it->second;
+ }
+}
+
+}
diff --git a/ydb/services/bg_tasks/ut/task_emulator.h b/ydb/services/bg_tasks/ut/task_emulator.h
new file mode 100644
index 00000000000..4ca71487667
--- /dev/null
+++ b/ydb/services/bg_tasks/ut/task_emulator.h
@@ -0,0 +1,101 @@
+#include <ydb/services/bg_tasks/abstract/activity.h>
+#include <ydb/services/bg_tasks/abstract/task.h>
+#include <ydb/services/bg_tasks/service.h>
+
+#include <util/system/hostname.h>
+
+namespace NKikimr {
+
+ class TTestInsertTaskState: public NBackgroundTasks::IJsonStringSerializable<NBackgroundTasks::ITaskState> {
+ private:
+ static TFactory::TRegistrator<TTestInsertTaskState> Registrator;
+
+ YDB_ACCESSOR(ui32, Counter, 0);
+ protected:
+ virtual NJson::TJsonValue DoSerializeToJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("counter", Counter);
+ return result;
+ }
+ virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonData) override {
+ Counter = jsonData["counter"].GetUIntegerSafe();
+ return true;
+ }
+ public:
+ TTestInsertTaskState() = default;
+ TTestInsertTaskState(const ui32 counter)
+ : Counter(counter)
+ {
+
+ }
+ static TString GetClassNameStatic() {
+ return "test_insert";
+ }
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+ };
+
+ class TTestInsertTaskScheduler: public NBackgroundTasks::IJsonStringSerializable<NBackgroundTasks::ITaskScheduler> {
+ private:
+ static TFactory::TRegistrator<TTestInsertTaskScheduler> Registrator;
+
+ YDB_ACCESSOR(ui32, Limit, 3);
+ protected:
+ virtual TInstant DoGetStartInstant() const override {
+ return TInstant::Zero();
+ }
+
+ virtual std::optional<TInstant> DoGetNextStartInstant(
+ const TInstant /*currentStartInstant*/, const NBackgroundTasks::TTaskStateContainer& state) const override {
+ if (state.GetAsSafe<TTestInsertTaskState>().GetCounter() <= Limit) {
+ return TInstant::Zero();
+ } else {
+ return {};
+ }
+ }
+
+ virtual NJson::TJsonValue DoSerializeToJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("limit", Limit);
+ return result;
+ }
+ virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonData) override {
+ Limit = jsonData["limit"].GetUIntegerSafe();
+ return true;
+ }
+ public:
+ TTestInsertTaskScheduler() = default;
+ virtual TString GetClassName() const override {
+ return TTestInsertTaskState::GetClassNameStatic();
+ }
+ };
+
+ class TTestInsertTaskActivity: public NBackgroundTasks::IJsonStringSerializable<NBackgroundTasks::ITaskActivity> {
+ private:
+ static TFactory::TRegistrator<TTestInsertTaskActivity> Registrator;
+ YDB_READONLY(TString, ActivityTaskId, TGUID::CreateTimebased().AsUuidString());
+
+ protected:
+ virtual NJson::TJsonValue DoSerializeToJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("id", ActivityTaskId);
+ return result;
+ }
+ virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonData) override {
+ ActivityTaskId = jsonData["id"].GetStringRobust();
+ return true;
+ }
+ virtual void DoFinished(const NBackgroundTasks::TTaskStateContainer& /*state*/) override;
+
+ virtual void DoExecute(NBackgroundTasks::ITaskExecutorController::TPtr controller,
+ const NBackgroundTasks::TTaskStateContainer& currentState) override;
+ public:
+ static bool IsFinished(const TString& id);
+ static ui32 GetCounterSum(const TString& id);
+ virtual TString GetClassName() const override {
+ return TTestInsertTaskState::GetClassNameStatic();
+ }
+ };
+}
+
diff --git a/ydb/services/bg_tasks/ut/ut_tasks.cpp b/ydb/services/bg_tasks/ut/ut_tasks.cpp
new file mode 100644
index 00000000000..5e603467fb1
--- /dev/null
+++ b/ydb/services/bg_tasks/ut/ut_tasks.cpp
@@ -0,0 +1,70 @@
+#include "task_emulator.h"
+#include <ydb/core/testlib/test_client.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/services/bg_tasks/abstract/activity.h>
+#include <ydb/services/bg_tasks/abstract/task.h>
+#include <ydb/services/bg_tasks/service.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/testing/unittest/tests_data.h>
+#include <util/system/hostname.h>
+
+namespace NKikimr {
+
+Y_UNIT_TEST_SUITE(BGTaskTests) {
+ Y_UNIT_TEST(DSRunTask) {
+ TPortManager pm;
+
+ ui32 grpcPort = pm.GetPort();
+ ui32 msgbPort = pm.GetPort();
+
+ Tests::TServerSettings serverSettings(msgbPort);
+ serverSettings.Port = msgbPort;
+ serverSettings.GrpcPort = grpcPort;
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMetadataProvider(false)
+ .SetEnableBackgroundTasks(true)
+ .SetEnableOlapSchemaOperations(true);
+ ;
+
+ Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
+ server->EnableGRpc(grpcPort);
+ Tests::TClient client(serverSettings);
+
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
+ runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_INFO);
+ runtime.SetLogPriority(NKikimrServices::METADATA_PROVIDER, NLog::PRI_INFO);
+ runtime.SetLogPriority(NKikimrServices::BG_TASKS, NLog::PRI_DEBUG);
+ // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ Cerr << "Initialization finished" << Endl;
+
+ TString activityId;
+ {
+ std::shared_ptr<TTestInsertTaskActivity> tActivity(new TTestInsertTaskActivity);
+ activityId = tActivity->GetActivityTaskId();
+ std::shared_ptr<TTestInsertTaskScheduler> tScheduler(new TTestInsertTaskScheduler);
+ NBackgroundTasks::TTask newTask(tActivity, tScheduler);
+ runtime.SendAsync(new IEventHandle(NBackgroundTasks::MakeServiceId(1), {}, new NBackgroundTasks::TEvAddTask(std::move(newTask))));
+ }
+ TDispatchOptions rmReady;
+ rmReady.CustomFinalCondition = [activityId] {
+ Y_VERIFY(TTestInsertTaskActivity::GetCounterSum(activityId) <= 6);
+ if (TTestInsertTaskActivity::IsFinished(activityId)) {
+ Y_VERIFY(TTestInsertTaskActivity::GetCounterSum(activityId) == 6);
+ return true;
+ } else {
+ Cerr << "COUNTER_SUM:" << TTestInsertTaskActivity::GetCounterSum(activityId) << Endl;
+ }
+ return false;
+ };
+ Y_VERIFY(runtime.DispatchEvents(rmReady, TDuration::Seconds(30)));
+ }
+}
+}
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp
index 80ded24e461..195c700d419 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.cpp
+++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp
@@ -18,9 +18,7 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetTables().size());
*CurrentSelection.mutable_result_sets() = std::move(*qResult.mutable_result_sets());
auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(CurrentSelection, RequestedActuality);
- if (!!parsedSnapshot) {
- CurrentSnapshot = parsedSnapshot;
- } else {
+ if (!parsedSnapshot) {
ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot";
}
@@ -36,6 +34,8 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
}
}
);
+ } else {
+ Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
}
}
@@ -80,9 +80,14 @@ void TDSAccessorRefresher::OnInitialized() {
}
TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor)
- : TBase(config, snapshotConstructor->GetTableSchema())
+ : TBase(config.GetRequestConfig())
, SnapshotConstructor(snapshotConstructor)
+ , Config(config)
{
}
+TVector<NKikimr::NMetadataProvider::ITableModifier::TPtr> TDSAccessorRefresher::BuildModifiers() const {
+ return SnapshotConstructor->GetTableSchema();
+}
+
}
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h
index 7d883893a83..39da63b99a1 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.h
+++ b/ydb/services/metadata/ds_table/accessor_refresh.h
@@ -39,12 +39,14 @@ private:
YDB_READONLY_DEF(ISnapshot::TPtr, CurrentSnapshot);
YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection);
TInstant RequestedActuality = TInstant::Zero();
+ const TConfig Config;
protected:
bool IsReady() const {
return !!CurrentSnapshot;
}
virtual void OnInitialized() override;
virtual void OnSnapshotModified() = 0;
+ virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const override;
public:
using TBase::Handle;
diff --git a/ydb/services/metadata/ds_table/config.h b/ydb/services/metadata/ds_table/config.h
index 372f446771c..228b3e637b6 100644
--- a/ydb/services/metadata/ds_table/config.h
+++ b/ydb/services/metadata/ds_table/config.h
@@ -14,7 +14,6 @@ private:
public:
TConfig() = default;
- TDuration GetRetryPeriod(const ui32 retry) const;
bool DeserializeFromProto(const NKikimrConfig::TMetadataProviderConfig& config);
};
}
diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp
index 99cf9a5a698..7536ff85f74 100644
--- a/ydb/services/metadata/initializer/accessor_init.cpp
+++ b/ydb/services/metadata/initializer/accessor_init.cpp
@@ -5,28 +5,36 @@ namespace NKikimr::NMetadataProvider {
void TDSAccessorInitialized::Bootstrap() {
RegisterState();
- Modifiers.front()->Execute(SelfId(), Config.GetRequestConfig());
+ auto modifiers = BuildModifiers();
+ for (auto&& i : modifiers) {
+ Modifiers.emplace_back(i);
+ }
+ if (Modifiers.size()) {
+ ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "modifiers count: " << Modifiers.size();
+ Modifiers.front()->Execute(SelfId(), Config);
+ } else {
+ ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "initialization finished";
+ OnInitialized();
+ }
}
void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPtr& /*ev*/) {
+ ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "modifiers count: " << Modifiers.size();
if (Modifiers.empty()) {
return;
}
Modifiers.pop_front();
if (Modifiers.size()) {
- Modifiers.front()->Execute(SelfId(), Config.GetRequestConfig());
+ Modifiers.front()->Execute(SelfId(), Config);
} else {
+ ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "initialization finished";
OnInitialized();
}
}
-TDSAccessorInitialized::TDSAccessorInitialized(const TConfig& config, const TVector<ITableModifier::TPtr>& modifiers)
+TDSAccessorInitialized::TDSAccessorInitialized(const NInternal::NRequest::TConfig& config)
: Config(config)
{
- Y_VERIFY(modifiers.size());
- for (auto&& i : modifiers) {
- Modifiers.emplace_back(i);
- }
}
}
diff --git a/ydb/services/metadata/initializer/accessor_init.h b/ydb/services/metadata/initializer/accessor_init.h
index d278408ae2e..b7086c12fef 100644
--- a/ydb/services/metadata/initializer/accessor_init.h
+++ b/ydb/services/metadata/initializer/accessor_init.h
@@ -13,13 +13,14 @@ namespace NKikimr::NMetadataProvider {
class TDSAccessorInitialized: public NActors::TActorBootstrapped<TDSAccessorInitialized> {
private:
TDeque<ITableModifier::TPtr> Modifiers;
+ const NInternal::NRequest::TConfig Config;
protected:
- const TConfig& Config;
virtual void RegisterState() = 0;
virtual void OnInitialized() = 0;
+ virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const = 0;
public:
void Bootstrap();
- TDSAccessorInitialized(const TConfig& config, const TVector<ITableModifier::TPtr>& modifiers);
+ TDSAccessorInitialized(const NInternal::NRequest::TConfig& config);
void Handle(NInternal::NRequest::TEvRequestFinished::TPtr& ev);
STATEFN(StateMain) {
diff --git a/ydb/services/metadata/request/common.h b/ydb/services/metadata/request/common.h
index eb7100c9389..9fb0a8b5f2b 100644
--- a/ydb/services/metadata/request/common.h
+++ b/ydb/services/metadata/request/common.h
@@ -18,10 +18,15 @@ enum EEvents {
EvSelectRequest,
EvSelectInternalResponse,
EvSelectResponse,
+ EvYQLRequest,
+ EvYQLInternalResponse,
+ EvGeneralYQLResponse,
EvCreateSessionRequest,
EvCreateSessionInternalResponse,
EvCreateSessionResponse,
EvRequestFinished,
+ EvRequestFailed,
+ EvRequestStart,
EvEnd
};
diff --git a/ydb/services/metadata/request/request_actor.h b/ydb/services/metadata/request/request_actor.h
index a5bf6064abc..a6cc0a51641 100644
--- a/ydb/services/metadata/request/request_actor.h
+++ b/ydb/services/metadata/request/request_actor.h
@@ -2,8 +2,6 @@
#include "common.h"
#include "config.h"
-#include <library/cpp/actors/core/actor_virtual.h>
-#include <library/cpp/actors/core/av_bootstrapped.h>
#include <library/cpp/actors/core/log.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/grpc_services/base/base.h>
@@ -29,6 +27,16 @@ using TDialogSelect = TDialogPolicyImpl<Ydb::Table::ExecuteDataQueryRequest, Ydb
using TDialogCreateSession = TDialogPolicyImpl<Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse,
EEvents::EvCreateSessionRequest, EEvents::EvCreateSessionInternalResponse, EEvents::EvCreateSessionResponse>;
+template <ui32 evResult = EEvents::EvGeneralYQLResponse>
+using TCustomDialogYQLRequest = TDialogPolicyImpl<Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse,
+ EEvents::EvYQLRequest, EEvents::EvYQLInternalResponse, evResult>;
+template <ui32 evResult = EEvents::EvCreateSessionResponse>
+using TCustomDialogCreateSpecialSession = TDialogPolicyImpl<Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse,
+ EEvents::EvCreateSessionRequest, EEvents::EvCreateSessionInternalResponse, evResult>;
+
+using TDialogYQLRequest = TCustomDialogYQLRequest<EEvents::EvGeneralYQLResponse>;
+using TDialogCreateSpecialSession = TCustomDialogCreateSpecialSession<EEvents::EvCreateSessionResponse>;
+
template <class TDialogPolicy>
class TEvRequestResult: public NActors::TEventLocal<TEvRequestResult<TDialogPolicy>, TDialogPolicy::EvResult> {
private:
@@ -42,7 +50,14 @@ public:
class TEvRequestFinished: public NActors::TEventLocal<TEvRequestFinished, EEvents::EvRequestFinished> {
public:
- TEvRequestFinished() = default;
+};
+
+class TEvRequestStart: public NActors::TEventLocal<TEvRequestStart, EEvents::EvRequestStart> {
+public:
+};
+
+class TEvRequestFailed: public NActors::TEventLocal<TEvRequestFailed, EEvents::EvRequestFailed> {
+public:
};
template <class TResponse>
@@ -71,6 +86,7 @@ private:
using TSelf = TYDBRequest<TDialogPolicy>;
TRequest ProtoRequest;
const NActors::TActorId ActorFinishId;
+ const NActors::TActorId ActorRestartId;
const TConfig& Config;
ui32 Retry = 0;
protected:
@@ -83,10 +99,6 @@ protected:
}
};
- class TEvRequestStart: public NActors::TEventLocal<TEvRequestStart, TDialogPolicy::EvStart> {
- public:
- };
-
public:
void Bootstrap(const TActorContext& /*ctx*/) {
TBase::Become(&TBase::TThis::StateMain);
@@ -104,14 +116,22 @@ public:
void Handle(typename TEvRequestInternalResult::TPtr& ev) {
if (!ev->Get()->GetFuture().HasValue() || ev->Get()->GetFuture().HasException()) {
ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive result on initialization";
- TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart);
+ if (ActorRestartId) {
+ TBase::template Sender<TEvRequestFailed>().SendTo(ActorRestartId);
+ } else {
+ TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart);
+ }
return;
}
auto f = ev->Get()->GetFuture();
TResponse response = f.ExtractValue();
if (!TOperatorChecker<TResponse>::IsSuccess(response)) {
ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "incorrect reply: " << response.DebugString();
- TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart);
+ if (ActorRestartId) {
+ TBase::template Sender<TEvRequestFailed>().SendTo(ActorRestartId);
+ } else {
+ TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart);
+ }
return;
}
TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(ActorFinishId);
@@ -131,13 +151,79 @@ public:
result.Subscribe(replyCallback);
}
- TYDBRequest(const TRequest& request, const NActors::TActorId actorFinishId, const TConfig& config)
+ TYDBRequest(const TRequest& request, const NActors::TActorId actorFinishId, const TConfig& config, const NActors::TActorId& actorRestartId = {})
: ProtoRequest(request)
, ActorFinishId(actorFinishId)
+ , ActorRestartId(actorRestartId)
, Config(config)
{
}
};
+template <class TDialogPolicy>
+class TSessionedActorImpl: public NActors::TActorBootstrapped<TSessionedActorImpl<TDialogPolicy>> {
+private:
+ ui32 Retry = 0;
+ const TActorId FinishedActorId;
+
+ static_assert(!std::is_same<TDialogPolicy, TDialogCreateSession>());
+ using TBase = NActors::TActorBootstrapped<TSessionedActorImpl<TDialogPolicy>>;
+ void Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev) {
+ Ydb::Table::CreateSessionResponse currentFullReply = ev->Get()->GetResult();
+ Ydb::Table::CreateSessionResult session;
+ currentFullReply.operation().result().UnpackTo(&session);
+ const TString sessionId = session.session_id();
+ Y_VERIFY(sessionId);
+ std::optional<typename TDialogPolicy::TRequest> nextRequest = OnSessionId(sessionId);
+ Y_VERIFY(nextRequest);
+ TBase::Register(new TYDBRequest<TDialogPolicy>(*nextRequest, TBase::SelfId(), Config, TBase::SelfId()));
+ }
+protected:
+ const NInternal::NRequest::TConfig Config;
+ virtual std::optional<typename TDialogPolicy::TRequest> OnSessionId(const TString& sessionId) = 0;
+ virtual void OnResult(const typename TDialogPolicy::TResponse& response) = 0;
+public:
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvRequestResult<TDialogCreateSession>, Handle);
+ hFunc(TEvRequestFailed, Handle);
+ hFunc(TEvRequestStart, Handle);
+ hFunc(TEvRequestResult<TDialogPolicy>, Handle);
+ default:
+ break;
+ }
+ }
+
+ TSessionedActorImpl(const NInternal::NRequest::TConfig& config)
+ : Config(config)
+ {
+
+ }
+
+ void Handle(typename TEvRequestResult<TDialogPolicy>::TPtr& ev) {
+ OnResult(ev->Get()->GetResult());
+ TBase::PassAway();
+ }
+
+ void Handle(typename TEvRequestFailed::TPtr& /*ev*/) {
+ TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart);
+ }
+
+ void Handle(typename TEvRequestFinished::TPtr& /*ev*/) {
+ Retry = 0;
+ }
+
+ void Handle(typename TEvRequestStart::TPtr& /*ev*/) {
+ TBase::Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), TBase::SelfId(), Config, TBase::SelfId()));
+ }
+
+ void Bootstrap() {
+ TBase::Become(&TSessionedActorImpl::StateMain);
+ TBase::template Sender<TEvRequestStart>().SendTo(TBase::SelfId());
+ }
+};
+
+using TSessionedActor = TSessionedActorImpl<TDialogYQLRequest>;
+
}