diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-07 16:34:06 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-07 16:34:06 +0300 |
commit | 964584075fe8d2284078222e1e7e2e424c8bd9a1 (patch) | |
tree | 317d2d43c895c729d14d2d601d772e7332348f9d | |
parent | 61143d004cc8a41d5ee751902baf8b47936aa910 (diff) | |
download | ydb-964584075fe8d2284078222e1e7e2e424c8bd9a1.tar.gz |
add background tasks service
67 files changed, 2487 insertions, 25 deletions
diff --git a/ydb/core/driver_lib/run/CMakeLists.txt b/ydb/core/driver_lib/run/CMakeLists.txt index 1dad9985c24..0b09685d4ab 100644 --- a/ydb/core/driver_lib/run/CMakeLists.txt +++ b/ydb/core/driver_lib/run/CMakeLists.txt @@ -124,6 +124,8 @@ target_link_libraries(run PUBLIC ydb-services-local_discovery services-metadata-ds_table ydb-services-metadata + services-bg_tasks-ds_table + ydb-services-bg_tasks ydb-services-monitoring ydb-services-persqueue_cluster_discovery ydb-services-persqueue_v1 diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 37119176f82..f5e0d339367 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -151,6 +151,9 @@ #include <ydb/services/metadata/ds_table/service.h> #include <ydb/services/metadata/service.h> +#include <ydb/services/bg_tasks/ds_table/executor.h> +#include <ydb/services/bg_tasks/service.h> + #include <library/cpp/actors/protos/services_common.pb.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -2012,7 +2015,25 @@ void TMetadataProviderInitializer::InitializeServices(NActors::TActorSystemSetup if (serviceConfig.IsEnabled()) { auto service = NMetadataProvider::CreateService(serviceConfig); setup->LocalServices.push_back(std::make_pair( - NMetadataProvider::MakeServiceId(), + NMetadataProvider::MakeServiceId(NodeId), + TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId))); + } +} + +TBackgroundTasksInitializer::TBackgroundTasksInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) { +} + +void TBackgroundTasksInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + NBackgroundTasks::TConfig serviceConfig; + if (Config.HasBackgroundTasksConfig()) { + Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetBackgroundTasksConfig())); + } + + if (serviceConfig.IsEnabled()) { + auto service = NBackgroundTasks::CreateService(serviceConfig); + setup->LocalServices.push_back(std::make_pair( + NBackgroundTasks::MakeServiceId(NodeId), TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId))); } } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 9ec1bdccf4f..f4bf8d00bd4 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -389,6 +389,12 @@ public: void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; }; +class TBackgroundTasksInitializer: public IKikimrServicesInitializer { +public: + TBackgroundTasksInitializer(const TKikimrRunConfig& runConfig); + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + class TMemoryLogInitializer : public IKikimrServicesInitializer { size_t LogBufferSize = 0; size_t LogGrainSize = 0; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 2ef714b7273..0f42c3b819d 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -585,6 +585,16 @@ message TMetadataProviderConfig { optional TInternalRequestConfig RequestConfig = 3; } +message TBackgroundTasksConfig { + optional bool Enabled = 1 [default = false]; + optional TInternalRequestConfig RequestConfig = 2; + optional uint32 PullPeriodSeconds = 3 [default = 10]; + optional uint32 PingPeriodSeconds = 4 [default = 2]; + optional uint32 PingCheckPeriodSeconds = 5 [default = 20]; + optional uint32 MaxInFlight = 6 [default = 8]; + optional string InternalTablePath = 7; +} + message TMemoryLogConfig { optional uint64 LogBufferSize = 1; optional uint64 LogGrainSize = 2; @@ -1642,6 +1652,7 @@ message TAppConfig { optional TFailureInjectionConfig FailureInjectionConfig = 56; optional THttpProxyConfig PublicHttpConfig = 57; optional TMetadataProviderConfig MetadataProviderConfig = 59; + optional TBackgroundTasksConfig BackgroundTasksConfig = 60; optional NYq.NConfig.TConfig YandexQueryConfig = 50; // TODO: remove after migration to FederatedQueryConfig diff --git a/ydb/core/testlib/CMakeLists.txt b/ydb/core/testlib/CMakeLists.txt index 208914902ec..636e2c0d694 100644 --- a/ydb/core/testlib/CMakeLists.txt +++ b/ydb/core/testlib/CMakeLists.txt @@ -92,6 +92,8 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-rate_limiter ydb-services-monitoring services-metadata-ds_table + services-bg_tasks-ds_table + ydb-services-bg_tasks ydb-services-ydb ydb-services-yq ydb-core-http_proxy diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index d3ace1b5383..6c1e1d33d80 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -90,6 +90,8 @@ #include <ydb/core/yq/libs/mock/yql_mock.h> #include <ydb/services/metadata/ds_table/service.h> #include <ydb/services/metadata/service.h> +#include <ydb/services/bg_tasks/ds_table/executor.h> +#include <ydb/services/bg_tasks/service.h> #include <ydb/library/folder_service/mock/mock_folder_service.h> #include <ydb/core/client/server/msgbus_server_tracer.h> @@ -628,6 +630,11 @@ namespace Tests { const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NMetadataProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid); } + if (Settings->IsEnableBackgroundTasks()) { + auto* actor = NBackgroundTasks::CreateService(NBackgroundTasks::TConfig()); + const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + Runtime->RegisterService(NBackgroundTasks::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid); + } Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); auto sysViewService = NSysView::CreateSysViewServiceForTests(); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index b69246cd534..649cb5d0f7a 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -208,6 +208,7 @@ namespace Tests { TServerSettings& operator=(const TServerSettings& settings) = default; private: YDB_FLAG_ACCESSOR(EnableMetadataProvider, true); + YDB_FLAG_ACCESSOR(EnableBackgroundTasks, false); }; class TServer : public TThrRefBase, TMoveOnly { diff --git a/ydb/core/tx/tiering/decoder.cpp b/ydb/core/tx/tiering/decoder.cpp index 40196235569..f8b0c5a190e 100644 --- a/ydb/core/tx/tiering/decoder.cpp +++ b/ydb/core/tx/tiering/decoder.cpp @@ -31,6 +31,34 @@ bool TDecoderBase::Read(const ui32 columnIdx, TDuration& result, const Ydb::Valu return true; } +bool TDecoderBase::Read(const ui32 columnIdx, bool& result, const Ydb::Value& r) const { + auto& pValue = r.items()[columnIdx]; + if (pValue.has_bool_value()) { + result = pValue.bool_value(); + } else { + ALS_WARN(0) << "incorrect type for instant seconds parsing"; + return false; + } + return true; +} + +bool TDecoderBase::Read(const ui32 columnIdx, TInstant& result, const Ydb::Value& r) const { + auto& pValue = r.items()[columnIdx]; + if (pValue.has_uint32_value()) { + result = TInstant::Seconds(pValue.uint32_value()); + } else if (pValue.has_int64_value()) { + result = TInstant::Seconds(pValue.int64_value()); + } else if (pValue.has_uint64_value()) { + result = TInstant::Seconds(pValue.uint64_value()); + } else if (pValue.has_int32_value()) { + result = TInstant::Seconds(pValue.int32_value()); + } else { + ALS_WARN(0) << "incorrect type for instant seconds parsing"; + return false; + } + return true; +} + bool TDecoderBase::ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const { const TString& s = r.items()[columnIdx].bytes_value(); if (!::google::protobuf::TextFormat::ParseFromString(s, &result)) { diff --git a/ydb/core/tx/tiering/decoder.h b/ydb/core/tx/tiering/decoder.h index 6866feedebb..587f4ae9da8 100644 --- a/ydb/core/tx/tiering/decoder.h +++ b/ydb/core/tx/tiering/decoder.h @@ -11,6 +11,8 @@ public: bool Read(const ui32 columnIdx, TString& result, const Ydb::Value& r) const; bool ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const; bool Read(const ui32 columnIdx, TDuration& result, const Ydb::Value& r) const; + bool Read(const ui32 columnIdx, TInstant& result, const Ydb::Value& r) const; + bool Read(const ui32 columnIdx, bool& result, const Ydb::Value& r) const; }; } diff --git a/ydb/services/CMakeLists.txt b/ydb/services/CMakeLists.txt index da7cf033015..f21cbb406ca 100644 --- a/ydb/services/CMakeLists.txt +++ b/ydb/services/CMakeLists.txt @@ -7,6 +7,7 @@ add_subdirectory(auth) +add_subdirectory(bg_tasks) add_subdirectory(cms) add_subdirectory(datastreams) add_subdirectory(discovery) diff --git a/ydb/services/bg_tasks/CMakeLists.txt b/ydb/services/bg_tasks/CMakeLists.txt new file mode 100644 index 00000000000..500fe426388 --- /dev/null +++ b/ydb/services/bg_tasks/CMakeLists.txt @@ -0,0 +1,25 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(abstract) +add_subdirectory(ds_table) +add_subdirectory(protos) +add_subdirectory(ut) + +add_library(ydb-services-bg_tasks) +target_link_libraries(ydb-services-bg_tasks PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + services-metadata-abstract + services-bg_tasks-abstract + services-bg_tasks-protos +) +target_sources(ydb-services-bg_tasks PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/service.cpp +) diff --git a/ydb/services/bg_tasks/abstract/CMakeLists.txt b/ydb/services/bg_tasks/abstract/CMakeLists.txt new file mode 100644 index 00000000000..871806f32db --- /dev/null +++ b/ydb/services/bg_tasks/abstract/CMakeLists.txt @@ -0,0 +1,27 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-bg_tasks-abstract) +target_link_libraries(services-bg_tasks-abstract PUBLIC + contrib-libs-cxxsupp + yutil + ydb-library-accessor + cpp-actors-core + api-protos + services-bg_tasks-protos + ydb-core-base +) +target_sources(services-bg_tasks-abstract PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/common.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/interface.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/scheduler.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/activity.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/task.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/abstract/state.cpp +) diff --git a/ydb/services/bg_tasks/abstract/activity.cpp b/ydb/services/bg_tasks/abstract/activity.cpp new file mode 100644 index 00000000000..ecca1f20fd7 --- /dev/null +++ b/ydb/services/bg_tasks/abstract/activity.cpp @@ -0,0 +1,5 @@ +#include "activity.h" + +namespace NKikimr::NBackgroundTasks { + +} diff --git a/ydb/services/bg_tasks/abstract/activity.h b/ydb/services/bg_tasks/abstract/activity.h new file mode 100644 index 00000000000..b67b5797f99 --- /dev/null +++ b/ydb/services/bg_tasks/abstract/activity.h @@ -0,0 +1,55 @@ +#pragma once +#include "interface.h" +#include "state.h" + +#include <ydb/core/base/events.h> + +#include <library/cpp/actors/core/events.h> +#include <library/cpp/object_factory/object_factory.h> + +namespace NKikimr::NBackgroundTasks { + +class ITaskExecutorController { +protected: + virtual void DoTaskInterrupted(ITaskState::TPtr taskState) = 0; + virtual void DoTaskFinished() = 0; + +public: + using TPtr = std::shared_ptr<ITaskExecutorController>; + virtual ~ITaskExecutorController() = default; + + void TaskInterrupted(ITaskState::TPtr taskState) { + return DoTaskInterrupted(taskState); + } + virtual void TaskFinished() { + return DoTaskFinished(); + } +}; + +class ITaskActivity: public IStringSerializable { +public: + using TPtr = std::shared_ptr<ITaskActivity>; + using TFactory = NObjectFactory::TObjectFactory<ITaskActivity, TString>; +protected: + virtual void DoExecute(ITaskExecutorController::TPtr controller, const TTaskStateContainer& state) = 0; + virtual void DoFinished(const TTaskStateContainer& /*state*/) { + + } +public: + virtual TString GetClassName() const = 0; + void Execute(ITaskExecutorController::TPtr controller, const TTaskStateContainer& state) { + DoExecute(controller, state); + }; + void Finished(const TTaskStateContainer& state) { + DoFinished(state); + }; +}; + +class TTaskActivityContainer: public TInterfaceStringContainer<ITaskActivity> { +private: + using TBase = TInterfaceStringContainer<ITaskActivity>; +public: + using TBase::TBase; +}; + +} diff --git a/ydb/services/bg_tasks/abstract/common.cpp b/ydb/services/bg_tasks/abstract/common.cpp new file mode 100644 index 00000000000..490f24b3827 --- /dev/null +++ b/ydb/services/bg_tasks/abstract/common.cpp @@ -0,0 +1,5 @@ +#include "common.h" + +namespace NKikimr::NBackgroundTasks { + +} diff --git a/ydb/services/bg_tasks/abstract/common.h b/ydb/services/bg_tasks/abstract/common.h new file mode 100644 index 00000000000..4c447c1626b --- /dev/null +++ b/ydb/services/bg_tasks/abstract/common.h @@ -0,0 +1,27 @@ +#pragma once +#include <ydb/core/base/events.h> + +#include <library/cpp/actors/core/events.h> + +namespace NKikimr::NBackgroundTasks { + +enum EEvents { + EvAddTask = EventSpaceBegin(TKikimrEvents::ES_BACKGROUND_TASKS), + EvStartAssign, + EvAssignFinished, + EvFetchingFinished, + EvTaskFinished, + EvTaskInterrupted, + EvTaskFetched, + EvTaskExecutorFinished, + EvUpdateTaskEnabled, + EvAddTaskResult, + EvUpdateTaskEnabledResult, + EvLockPingerStart, + EvLockPingerFinished, + EvEnd +}; + +static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_BACKGROUND_TASKS), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_BACKGROUND_TASKS)"); + +} diff --git a/ydb/services/bg_tasks/abstract/interface.cpp b/ydb/services/bg_tasks/abstract/interface.cpp new file mode 100644 index 00000000000..a0587cc5830 --- /dev/null +++ b/ydb/services/bg_tasks/abstract/interface.cpp @@ -0,0 +1,5 @@ +#include "interface.h" + +namespace NKikimr::NBackgroundTasks { + +} diff --git a/ydb/services/bg_tasks/abstract/interface.h b/ydb/services/bg_tasks/abstract/interface.h new file mode 100644 index 00000000000..65065963329 --- /dev/null +++ b/ydb/services/bg_tasks/abstract/interface.h @@ -0,0 +1,290 @@ +#pragma once +#include <ydb/core/base/events.h> +#include <ydb/services/bg_tasks/protos/container.pb.h> + +#include <library/cpp/actors/core/events.h> +#include <library/cpp/object_factory/object_factory.h> +#include <library/cpp/json/writer/json_value.h> +#include <library/cpp/json/json_reader.h> +#include <library/cpp/actors/core/log.h> + +namespace NKikimr::NBackgroundTasks { + +class IStringSerializable { +protected: + virtual bool DoDeserializeFromString(const TString& data) = 0; + virtual TString DoSerializeToString() const = 0; +public: + bool DeserializeFromString(const TString& data) { + return DoDeserializeFromString(data); + } + + TString SerializeToString() const { + return DoSerializeToString(); + } + virtual ~IStringSerializable() = default; +}; + +template <class TBaseClass = IStringSerializable> +class IJsonStringSerializable: public TBaseClass { +protected: + virtual bool DoDeserializeFromString(const TString& data) override final { + std::optional<NJson::TJsonValue> jsonData = ParseStringToStorageObject(data); + if (!jsonData) { + return false; + } + return DeserializeFromJson(*jsonData); + } + virtual TString DoSerializeToString() const override final { + NJson::TJsonValue jsonData = SerializeToJson(); + return jsonData.GetStringRobust(); + } + virtual NJson::TJsonValue DoSerializeToJson() const = 0; + virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonData) = 0; +public: + static std::optional<NJson::TJsonValue> ParseStringToStorageObject(const TString& data) { + NJson::TJsonValue jsonData; + if (!NJson::ReadJsonFastTree(data, &jsonData)) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse string as json: " << Base64Encode(data); + return {}; + } + return jsonData; + } + static NJson::TJsonValue SerializeToStorageObject(const IJsonStringSerializable& object) { + return object.SerializeToJson(); + } + NJson::TJsonValue SerializeToJson() const { + return DoSerializeToJson(); + } + bool DeserializeFromJson(const NJson::TJsonValue& jsonData) { + return DoDeserializeFromJson(jsonData); + } +}; + +template <class TProtoClass, class TBaseClass = IStringSerializable> +class IProtoStringSerializable: public TBaseClass { +private: + using TSelf = IProtoStringSerializable<TProtoClass, TBaseClass>; +protected: + virtual bool DoDeserializeFromString(const TString& data) override final { + std::optional<TProtoClass> protoData = ParseStringToStorageObject(data); + if (!protoData) { + return false; + } + return DeserializeFromProto(*protoData); + } + virtual TString DoSerializeToString() const override final { + TProtoClass protoData = SerializeToProto(); + return protoData.SerializeAsString(); + } + virtual TProtoClass DoSerializeToProto() const = 0; + virtual bool DoDeserializeFromProto(const TProtoClass& protoData) = 0; +public: + static std::optional<TProtoClass> ParseStringToStorageObject(const TString& data) { + TProtoClass protoData; + if (!protoData.ParseFromArray(data.data(), data.size())) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse string as proto: " << Base64Encode(data); + return {}; + } + return protoData; + } + static TProtoClass SerializeToStorageObject(const IProtoStringSerializable& object) { + return object.SerializeToProto(); + } + TProtoClass SerializeToProto() const { + return DoSerializeToProto(); + } + bool DeserializeFromProto(const TProtoClass& protoData) { + return DoDeserializeFromProto(protoData); + } +}; + +template <class IInterface> +class TCommonInterfaceContainer { +protected: + std::shared_ptr<IInterface> Object; + using TFactory = typename IInterface::TFactory; +public: + TCommonInterfaceContainer() = default; + TCommonInterfaceContainer(std::shared_ptr<IInterface> object) + : Object(object) { + + } + + bool HasObject() const { + return !!Object; + } + + template <class T> + const T& GetAsSafe() const { + auto result = std::dynamic_pointer_cast<T>(Object); + Y_VERIFY(!!result); + return *result; + } + + template <class T> + T& GetAsSafe() { + auto result = std::dynamic_pointer_cast<T>(Object); + Y_VERIFY(!!result); + return *result; + } + + std::shared_ptr<IInterface> GetObjectPtr() const { + return Object; + } + + const IInterface* operator->() const { + return Object.get(); + } + + IInterface* operator->() { + return Object.get(); + } + + bool operator!() const { + return !Object; + } + +}; + +template <class IInterface> +class TInterfaceStringContainer: public TCommonInterfaceContainer<IInterface> { +protected: + using TBase = TCommonInterfaceContainer<IInterface>; + using TFactory = typename TBase::TFactory; + using TBase::Object; +public: + using TBase::TBase; + TString SerializeToStringBase64() const { + return Base64Encode(SerializeToString()); + } + + TString SerializeToString() const { + NKikimrProto::TStringContainer result; + if (!Object) { + return result.SerializeAsString(); + } + result.SetClassName(Object->GetClassName()); + result.SetBinaryData(Object->SerializeToString()); + return result.SerializeAsString(); + } + + bool DeserializeFromString(const TString& data) { + if (!data) { + Object = nullptr; + return true; + } + NKikimrProto::TStringContainer protoData; + if (!protoData.ParseFromArray(data.data(), data.size())) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse string as proto: " << Base64Encode(data); + return {}; + } + const TString& className = protoData.GetClassName(); + std::shared_ptr<IInterface> object(TFactory::Construct(protoData.GetClassName())); + if (!object) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name(); + return false; + } + + if (!object->DeserializeFromString(protoData.GetBinaryData())) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse class instance: " << className << " for " << typeid(IInterface).name(); + return false; + } + Object = object; + return true; + } +}; + +class TDefaultJsonContainerPolicy { +public: + static TString GetClassName(const NJson::TJsonValue& jsonInfo) { + return jsonInfo["className"].GetStringRobust(); + } + static void SetClassName(NJson::TJsonValue& jsonInfo, const TString& className) { + jsonInfo["className"] = className; + } +}; + +template <class IInterface, class TOperatorPolicy = TDefaultJsonContainerPolicy> +class TInterfaceJsonContainer: public TCommonInterfaceContainer<IInterface> { +protected: + using TBase = TCommonInterfaceContainer<IInterface>; + using TFactory = typename TBase::TFactory; + using TBase::Object; +public: + using TBase::TBase; + NJson::TJsonValue SerializeToJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + if (!Object) { + return result; + } + TOperatorPolicy::SetClassName(result, Object->GetClassName()); + result.InsertValue("objectData", Object->SerializeToJson()); + return result; + } + + bool DeserializeFromJson(const NJson::TJsonValue& data) { + const TString& className = TOperatorPolicy::GetClassName(data); + std::shared_ptr<IInterface> object(TFactory::Construct(className)); + if (!object) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name(); + return false; + } + if (!object->DeserializeFromJson(data["objectData"])) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse class instance: " << className << " for " << typeid(IInterface).name(); + return false; + } + Object = object; + return true; + } +}; + +class TDefaultProtoContainerPolicy { +public: + template <class TProto> + static TString GetClassName(const TProto& protoInfo) { + return protoInfo.GetClassName(); + } + template <class TProto> + static void SetClassName(TProto& protoInfo, const TString& className) { + protoInfo.SetClassName(className); + } +}; + +template <class IInterface, class TOperatorPolicy = TDefaultProtoContainerPolicy> +class TInterfaceProtoContainer: public TCommonInterfaceContainer<IInterface> { +private: + using TProto = typename IInterface::TProto; +protected: + using TBase = TCommonInterfaceContainer<IInterface>; + using TFactory = typename TBase::TFactory; + using TBase::Object; +public: + using TBase::TBase; + bool DeserializeFromProto(const TProto& data) { + const TString& className = TOperatorPolicy::GetClassName(data); + std::shared_ptr<IInterface> object(TFactory::Construct(className)); + if (!object) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "incorrect class name: " << className << " for " << typeid(IInterface).name(); + return false; + } + if (!object->DeserializeFromProto(data)) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse class instance: " << className << " for " << typeid(IInterface).name(); + return false; + } + Object = object; + return true; + } + + TProto SerializeToProto() const { + TProto result; + if (!Object) { + return result; + } + result = Object->SerializeToProto(); + TOperatorPolicy::SetClassName(result, Object->GetClassName()); + return result; + } +}; + +} diff --git a/ydb/services/bg_tasks/abstract/scheduler.cpp b/ydb/services/bg_tasks/abstract/scheduler.cpp new file mode 100644 index 00000000000..8c30232e4f8 --- /dev/null +++ b/ydb/services/bg_tasks/abstract/scheduler.cpp @@ -0,0 +1,5 @@ +#include "scheduler.h" + +namespace NKikimr::NBackgroundTasks { + +} diff --git a/ydb/services/bg_tasks/abstract/scheduler.h b/ydb/services/bg_tasks/abstract/scheduler.h new file mode 100644 index 00000000000..84ff7bab728 --- /dev/null +++ b/ydb/services/bg_tasks/abstract/scheduler.h @@ -0,0 +1,54 @@ +#pragma once +#include "interface.h" +#include "state.h" + +#include <ydb/core/base/events.h> + +#include <library/cpp/actors/core/events.h> +#include <library/cpp/object_factory/object_factory.h> + +namespace NKikimr::NBackgroundTasks { + +class ITaskScheduler: public IStringSerializable { +public: + using TPtr = std::shared_ptr<ITaskScheduler>; + using TFactory = NObjectFactory::TObjectFactory<ITaskScheduler, TString>; +protected: + virtual std::optional<TInstant> DoGetNextStartInstant(const TInstant currentStartInstant, const TTaskStateContainer& state) const = 0; + virtual TInstant DoGetStartInstant() const = 0; +public: + virtual TString GetClassName() const = 0; + std::optional<TInstant> GetNextStartInstant(const TInstant currentStartInstant, const TTaskStateContainer& state) const { + return DoGetNextStartInstant(currentStartInstant, state); + } + TInstant GetStartInstant() const { + return DoGetStartInstant(); + } +}; + +class TTaskSchedulerContainer: public TInterfaceStringContainer<ITaskScheduler> { +private: + using TBase = TInterfaceStringContainer<ITaskScheduler>; +public: + using TBase::TBase; + + std::optional<TInstant> GetNextStartInstant(const TInstant currentStartInstant, const TTaskStateContainer& state) const { + if (!Object) { + return TInstant::Zero(); + } + return Object->GetNextStartInstant(currentStartInstant, state); + } + + TInstant GetStartInstant() const { + if (!Object) { + return TInstant::Zero(); + } + return Object->GetStartInstant(); + } +}; + +class IJsonTaskScheduler: public IJsonStringSerializable<ITaskScheduler> { + +}; + +} diff --git a/ydb/services/bg_tasks/abstract/state.cpp b/ydb/services/bg_tasks/abstract/state.cpp new file mode 100644 index 00000000000..5cd85884467 --- /dev/null +++ b/ydb/services/bg_tasks/abstract/state.cpp @@ -0,0 +1,5 @@ +#include "state.h" + +namespace NKikimr::NBackgroundTasks { + +} diff --git a/ydb/services/bg_tasks/abstract/state.h b/ydb/services/bg_tasks/abstract/state.h new file mode 100644 index 00000000000..e4062dd6ecd --- /dev/null +++ b/ydb/services/bg_tasks/abstract/state.h @@ -0,0 +1,25 @@ +#pragma once +#include "interface.h" + +#include <ydb/core/base/events.h> + +#include <library/cpp/actors/core/events.h> +#include <library/cpp/object_factory/object_factory.h> + +namespace NKikimr::NBackgroundTasks { +class ITaskState: public IStringSerializable { +public: + using TPtr = std::shared_ptr<ITaskState>; + using TFactory = NObjectFactory::TObjectFactory<ITaskState, TString>; +public: + virtual TString GetClassName() const = 0; +}; + +class TTaskStateContainer: public TInterfaceStringContainer<ITaskState> { +private: + using TBase = TInterfaceStringContainer<ITaskState>; +public: + using TBase::TBase; +}; + +} diff --git a/ydb/services/bg_tasks/abstract/task.cpp b/ydb/services/bg_tasks/abstract/task.cpp new file mode 100644 index 00000000000..2fbe51fab03 --- /dev/null +++ b/ydb/services/bg_tasks/abstract/task.cpp @@ -0,0 +1,5 @@ +#include "task.h" + +namespace NKikimr::NBackgroundTasks { + +} diff --git a/ydb/services/bg_tasks/abstract/task.h b/ydb/services/bg_tasks/abstract/task.h new file mode 100644 index 00000000000..eb87ea5e1b9 --- /dev/null +++ b/ydb/services/bg_tasks/abstract/task.h @@ -0,0 +1,149 @@ +#pragma once +#include "activity.h" +#include "scheduler.h" +#include "state.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/events.h> +#include <ydb/core/tx/tiering/decoder.h> +#include <ydb/library/accessor/accessor.h> + +#include <library/cpp/actors/core/events.h> +#include <library/cpp/object_factory/object_factory.h> +#include <util/generic/guid.h> + +namespace NKikimr::NBackgroundTasks { + +class TTask; + +class TTask { +private: + YDB_ACCESSOR(TString, Id, TGUID::CreateTimebased().AsGuidString()); + YDB_READONLY_DEF(TString, Class); + YDB_READONLY_DEF(TString, ExecutorId); + YDB_READONLY(TInstant, LastPing, TInstant::Zero()); + YDB_READONLY(TInstant, StartInstant, TInstant::Zero()); + YDB_READONLY(TInstant, ConstructInstant, TAppData::TimeProvider->Now()); + YDB_READONLY_DEF(TTaskActivityContainer, Activity); + YDB_READONLY_DEF(TTaskSchedulerContainer, Scheduler); + YDB_ACCESSOR_DEF(TTaskStateContainer, State); + YDB_FLAG_ACCESSOR(Enabled, true); +public: + TTask() = default; + TTask(ITaskActivity::TPtr activity, ITaskScheduler::TPtr scheduler) + : Activity(activity) + , Scheduler(scheduler) + { + + } + + void Execute(ITaskExecutorController::TPtr controller) const { + if (!Activity) { + controller->TaskFinished(); + } else { + Activity.GetObjectPtr()->Execute(controller, State); + } + } + + void Finished() const { + if (!!Activity) { + Activity.GetObjectPtr()->Finished(State); + } + } + + class TDecoder: public NInternal::TDecoderBase { + private: + YDB_ACCESSOR(i32, IdIdx, -1); + YDB_ACCESSOR(i32, ClassIdx, -1); + YDB_ACCESSOR(i32, ExecutorIdIdx, -1); + YDB_ACCESSOR(i32, LastPingIdx, -1); + YDB_ACCESSOR(i32, StartInstantIdx, -1); + YDB_ACCESSOR(i32, ConstructInstantIdx, -1); + YDB_ACCESSOR(i32, ActivityIdx, -1); + YDB_ACCESSOR(i32, SchedulerIdx, -1); + YDB_ACCESSOR(i32, StateIdx, -1); + YDB_ACCESSOR(i32, EnabledIdx, -1); + public: + static inline const TString Id = "id"; + static inline const TString Class = "class"; + static inline const TString ExecutorId = "executorId"; + static inline const TString LastPing = "lastPing"; + static inline const TString StartInstant = "startInstant"; + static inline const TString ConstructInstant = "constructInstant"; + static inline const TString Activity = "activity"; + static inline const TString Scheduler = "scheduler"; + static inline const TString State = "state"; + static inline const TString Enabled = "enabled"; + + TDecoder(const Ydb::ResultSet& rawData) { + IdIdx = GetFieldIndex(rawData, Id); + ClassIdx = GetFieldIndex(rawData, Class); + ExecutorIdIdx = GetFieldIndex(rawData, ExecutorId); + LastPingIdx = GetFieldIndex(rawData, LastPing); + StartInstantIdx = GetFieldIndex(rawData, StartInstant); + ConstructInstantIdx = GetFieldIndex(rawData, ConstructInstant); + ActivityIdx = GetFieldIndex(rawData, Activity); + SchedulerIdx = GetFieldIndex(rawData, Scheduler); + StateIdx = GetFieldIndex(rawData, State); + EnabledIdx = GetFieldIndex(rawData, Enabled); + } + }; + + bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue) { + if (!decoder.Read(decoder.GetIdIdx(), Id, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetClassIdx(), Class, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetEnabledIdx(), EnabledFlag, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetExecutorIdIdx(), ExecutorId, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetLastPingIdx(), LastPing, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetStartInstantIdx(), StartInstant, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetConstructInstantIdx(), ConstructInstant, rawValue)) { + return false; + } + { + TString activityData; + if (!decoder.Read(decoder.GetActivityIdx(), activityData, rawValue)) { + return false; + } + if (!Activity.DeserializeFromString(activityData)) { + return false; + } + if (!Activity) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse task activity"; + return false; + } + } + { + TString schedulerData; + if (!decoder.Read(decoder.GetSchedulerIdx(), schedulerData, rawValue)) { + return false; + } + if (!Scheduler.DeserializeFromString(schedulerData)) { + return false; + } + } + { + TString stateData; + if (!decoder.Read(decoder.GetStateIdx(), stateData, rawValue)) { + return false; + } + if (!State.DeserializeFromString(stateData)) { + return false; + } + } + return true; + } +}; + +} diff --git a/ydb/services/bg_tasks/ds_table/CMakeLists.txt b/ydb/services/bg_tasks/ds_table/CMakeLists.txt new file mode 100644 index 00000000000..5fecb1aa1bc --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/CMakeLists.txt @@ -0,0 +1,35 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-bg_tasks-ds_table) +target_link_libraries(services-bg_tasks-ds_table PUBLIC + contrib-libs-cxxsupp + yutil + ydb-library-accessor + cpp-actors-core + api-protos + services-bg_tasks-abstract + services-metadata-initializer + ydb-core-base + services-metadata-request +) +target_sources(services-bg_tasks-ds_table PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/interrupt.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/executor_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/finish_task.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/assign_tasks.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/add_tasks.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_enabled.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/lock_pinger.cpp +) diff --git a/ydb/services/bg_tasks/ds_table/add_tasks.cpp b/ydb/services/bg_tasks/ds_table/add_tasks.cpp new file mode 100644 index 00000000000..9482d0087ef --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/add_tasks.cpp @@ -0,0 +1,79 @@ +#include "add_tasks.h" + +#include <ydb/services/bg_tasks/service.h> + +namespace NKikimr::NBackgroundTasks { + +std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TAddTasksActor::OnSessionId(const TString& sessionId) { + Ydb::Table::ExecuteDataQueryRequest request; + TStringBuilder sb; + sb << "DECLARE $activityString AS String;" << Endl; + sb << "DECLARE $taskId AS String;" << Endl; + sb << "DECLARE $schedulerString AS String;" << Endl; + sb << "DECLARE $enabled AS Bool;" << Endl; + sb << "DECLARE $className AS String;" << Endl; + sb << "DECLARE $startInstant AS Uint32;" << Endl; + sb << "DECLARE $constructInstant AS Uint32;" << Endl; + sb << "UPSERT INTO `" + Controller->GetTableName() + "` (id, enabled, class, startInstant, constructInstant, activity, scheduler)" << Endl; + sb << "VALUES" << Endl; + { + sb << "(" << Endl; + sb << "$taskId," << Endl; + sb << "$enabled," << Endl; + sb << "$className," << Endl; + sb << "$startInstant," << Endl; + sb << "$constructInstant," << Endl; + sb << "$activityString," << Endl; + sb << "$schedulerString" << Endl; + sb << ")" << Endl; + } + request.mutable_query()->set_yql_text(sb); + + { + auto& param = (*request.mutable_parameters())["$enabled"]; + param.mutable_value()->set_bool_value(Task.IsEnabled()); + param.mutable_type()->set_type_id(Ydb::Type::BOOL); + } + + { + auto& param = (*request.mutable_parameters())["$constructInstant"]; + param.mutable_value()->set_uint32_value(Task.GetConstructInstant().Seconds()); + param.mutable_type()->set_type_id(Ydb::Type::UINT32); + } + + { + auto& param = (*request.mutable_parameters())["$startInstant"]; + param.mutable_value()->set_uint32_value(Task.GetScheduler().GetStartInstant().Seconds()); + param.mutable_type()->set_type_id(Ydb::Type::UINT32); + } + + { + auto& param = (*request.mutable_parameters())["$className"]; + param.mutable_value()->set_bytes_value(Task.GetClass()); + param.mutable_type()->set_type_id(Ydb::Type::STRING); + } + + auto& idString = (*request.mutable_parameters())["$taskId"]; + idString.mutable_value()->set_bytes_value(Task.GetId()); + idString.mutable_type()->set_type_id(Ydb::Type::STRING); + + auto& aString = (*request.mutable_parameters())["$activityString"]; + aString.mutable_value()->set_bytes_value(Task.GetActivity().SerializeToString()); + aString.mutable_type()->set_type_id(Ydb::Type::STRING); + + auto& sString = (*request.mutable_parameters())["$schedulerString"]; + sString.mutable_value()->set_bytes_value(Task.GetScheduler().SerializeToString()); + sString.mutable_type()->set_type_id(Ydb::Type::STRING); + + request.set_session_id(sessionId); + request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); + request.mutable_tx_control()->set_commit_tx(true); + + return request; +} + +void TAddTasksActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*ev*/) { + Sender<TEvAddTaskResult>(Task.GetId(), true).SendTo(ResultWaiter); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/add_tasks.h b/ydb/services/bg_tasks/ds_table/add_tasks.h new file mode 100644 index 00000000000..ef1e3bdac92 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/add_tasks.h @@ -0,0 +1,27 @@ +#pragma once +#include "executor_controller.h" +#include <ydb/services/bg_tasks/abstract/task.h> +#include <ydb/services/metadata/request/request_actor.h> + +namespace NKikimr::NBackgroundTasks { + +class TAddTasksActor: public NInternal::NRequest::TSessionedActor { +private: + using TBase = NInternal::NRequest::TSessionedActor; + TExecutorController::TPtr Controller; + const TTask Task; + const TActorId ResultWaiter; +protected: + virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& ev) override; + virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override; + +public: + TAddTasksActor(TExecutorController::TPtr controller, const TTask& task, const TActorId resultWaiter) + : TBase(controller->GetRequestConfig()) + , Controller(controller) + , Task(task) + , ResultWaiter(resultWaiter) + { + } +}; +} diff --git a/ydb/services/bg_tasks/ds_table/assign_tasks.cpp b/ydb/services/bg_tasks/ds_table/assign_tasks.cpp new file mode 100644 index 00000000000..03a43a0695a --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/assign_tasks.cpp @@ -0,0 +1,45 @@ +#include "assign_tasks.h" + +namespace NKikimr::NBackgroundTasks { + +std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TAssignTasksActor::OnSessionId(const TString& sessionId) { + Ydb::Table::ExecuteDataQueryRequest request; + TStringBuilder sb; + const auto now = TActivationContext::Now(); + sb << "DECLARE $executorId AS String;" << Endl; + sb << "DECLARE $lastPingCriticalBorder AS Uint32;" << Endl; + sb << "DECLARE $lastPingNewValue AS Uint32;" << Endl; + sb << "$ids = (SELECT id FROM `" << Controller->GetTableName() << "`" + << " WHERE (lastPing < $lastPingCriticalBorder" + << " OR executorId IS NULL) AND enabled = true" + << " LIMIT " << TasksCount << ");" << Endl; + sb << "UPSERT INTO `" + Controller->GetTableName() + "`" + << " SELECT id, $executorId as executorId, $lastPingNewValue as lastPing" + << " FROM $ids"; + { + auto& param = (*request.mutable_parameters())["$lastPingCriticalBorder"]; + param.mutable_value()->set_uint32_value((now - Controller->GetConfig().GetPingCheckPeriod()).Seconds()); + param.mutable_type()->set_type_id(Ydb::Type::UINT32); + } + { + auto& param = (*request.mutable_parameters())["$lastPingNewValue"]; + param.mutable_value()->set_uint32_value(now.Seconds()); + param.mutable_type()->set_type_id(Ydb::Type::UINT32); + } + { + auto& param = (*request.mutable_parameters())["$executorId"]; + param.mutable_value()->set_bytes_value(ExecutorId); + param.mutable_type()->set_type_id(Ydb::Type::STRING); + } + request.mutable_query()->set_yql_text(sb); + request.set_session_id(sessionId); + request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); + request.mutable_tx_control()->set_commit_tx(true); + return request; +} + +void TAssignTasksActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*result*/) { + Controller->AssignFinished(); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/assign_tasks.h b/ydb/services/bg_tasks/ds_table/assign_tasks.h new file mode 100644 index 00000000000..640fa39aaee --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/assign_tasks.h @@ -0,0 +1,27 @@ +#pragma once +#include "executor_controller.h" + +#include <ydb/services/metadata/request/request_actor.h> + +namespace NKikimr::NBackgroundTasks { + +class TAssignTasksActor: public NInternal::NRequest::TSessionedActor { +private: + using TBase = NInternal::NRequest::TSessionedActor; + TExecutorController::TPtr Controller; + const ui32 TasksCount; + const TString ExecutorId; + + virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override; + virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override; +public: + TAssignTasksActor(const ui32 tasksCount, TExecutorController::TPtr controller, const TString& executorId) + : TBase(controller->GetRequestConfig()) + , Controller(controller) + , TasksCount(tasksCount) + , ExecutorId(executorId) + { + + } +}; +} diff --git a/ydb/services/bg_tasks/ds_table/config.cpp b/ydb/services/bg_tasks/ds_table/config.cpp new file mode 100644 index 00000000000..0549e811aa5 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/config.cpp @@ -0,0 +1,23 @@ +#include "config.h" +#include <ydb/core/base/appdata.h> + +namespace NKikimr::NBackgroundTasks { + +bool TConfig::DeserializeFromProto(const NKikimrConfig::TBackgroundTasksConfig& config) { + EnabledFlag = config.GetEnabled(); + if (!RequestConfig.DeserializeFromProto(config.GetRequestConfig())) { + return false; + } + PullPeriod = TDuration::Seconds(config.GetPullPeriodSeconds()); + PingPeriod = TDuration::Seconds(config.GetPingPeriodSeconds()); + PingCheckPeriod = TDuration::Seconds(config.GetPingCheckPeriodSeconds()); + MaxInFlight = config.GetMaxInFlight(); + InternalTablePath = config.GetInternalTablePath(); + return true; +} + +TString TConfig::GetTablePath() const { + return AppData()->TenantName + "/" + InternalTablePath; +} + +} diff --git a/ydb/services/bg_tasks/ds_table/config.h b/ydb/services/bg_tasks/ds_table/config.h new file mode 100644 index 00000000000..1d36f5bbc68 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/config.h @@ -0,0 +1,22 @@ +#pragma once +#include <ydb/core/protos/config.pb.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/services/metadata/request/config.h> + +#include <util/datetime/base.h> + +namespace NKikimr::NBackgroundTasks { +class TConfig { +private: + YDB_READONLY_DEF(NInternal::NRequest::TConfig, RequestConfig); + YDB_READONLY(TDuration, PullPeriod, TDuration::Seconds(10)); + YDB_READONLY(TDuration, PingPeriod, TDuration::Seconds(2)); + YDB_READONLY(TDuration, PingCheckPeriod, TDuration::Seconds(20)); + YDB_READONLY(ui32, MaxInFlight, 1); + YDB_READONLY(TString, InternalTablePath, ".bg_tasks/tasks"); + YDB_READONLY_FLAG(Enabled, false); +public: + bool DeserializeFromProto(const NKikimrConfig::TBackgroundTasksConfig& config); + TString GetTablePath() const; +}; +} diff --git a/ydb/services/bg_tasks/ds_table/executor.cpp b/ydb/services/bg_tasks/ds_table/executor.cpp new file mode 100644 index 00000000000..252e8c79521 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/executor.cpp @@ -0,0 +1,143 @@ +#include "add_tasks.h" +#include "assign_tasks.h" +#include "executor.h" +#include "lock_pinger.h" +#include "task_executor.h" +#include "task_enabled.h" +#include "fetch_tasks.h" + +namespace NKikimr::NBackgroundTasks { + +TVector<NKikimr::NMetadataProvider::ITableModifier::TPtr> TExecutor::BuildModifiers() const { + const TString tableName = Config.GetTablePath(); + TVector<NMetadataProvider::ITableModifier::TPtr> result; + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(tableName); + request.add_primary_key("id"); + { + auto& column = *request.add_columns(); + column.set_name("id"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("enabled"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::BOOL); + } + { + auto& column = *request.add_columns(); + column.set_name("class"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("executorId"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("lastPing"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); + } + { + auto& column = *request.add_columns(); + column.set_name("startInstant"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); + } + { + auto& column = *request.add_columns(); + column.set_name("constructInstant"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); + } + { + auto& column = *request.add_columns(); + column.set_name("activity"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("scheduler"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("state"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); + } + return result; +} + +void TExecutor::Handle(TEvStartAssign::TPtr& /*ev*/) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "start assign"; + if (Config.GetMaxInFlight() > CurrentTaskIds.size()) { + Register(new TAssignTasksActor(Config.GetMaxInFlight() - CurrentTaskIds.size(), Controller, ExecutorId)); + } +} + +void TExecutor::Handle(TEvAssignFinished::TPtr& /*ev*/) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "assign finished"; + Register(new TFetchTasksActor(CurrentTaskIds, ExecutorId, Controller)); +} + +void TExecutor::Handle(TEvFetchingFinished::TPtr& /*ev*/) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "assign scheduled: " << Config.GetPullPeriod(); + Schedule(Config.GetPullPeriod(), new TEvStartAssign); +} + +void TExecutor::Handle(TEvLockPingerFinished::TPtr& /*ev*/) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "pinger scheduled: " << Config.GetPingPeriod(); + Schedule(Config.GetPingPeriod(), new TEvLockPingerStart); +} + +void TExecutor::Handle(TEvLockPingerStart::TPtr& /*ev*/) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "pinger start"; + if (CurrentTaskIds.size()) { + Register(new TLockPingerActor(Controller, CurrentTaskIds)); + } else { + Schedule(Config.GetPingPeriod(), new TEvLockPingerStart); + } +} + +void TExecutor::Handle(TEvTaskFetched::TPtr& ev) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "task fetched"; + if (CurrentTaskIds.emplace(ev->Get()->GetTask().GetId()).second) { + Register(new TTaskExecutor(ev->Get()->GetTask(), Controller)); + } +} + +void TExecutor::Handle(TEvTaskExecutorFinished::TPtr& ev) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "task executor finished"; + Y_VERIFY(CurrentTaskIds.contains(ev->Get()->GetTaskId())); + CurrentTaskIds.erase(ev->Get()->GetTaskId()); + Sender<TEvStartAssign>().SendTo(SelfId()); +} + +void TExecutor::Handle(TEvAddTask::TPtr& ev) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "add task"; + Register(new TAddTasksActor(Controller, ev->Get()->GetTask(), ev->Sender)); +} + +void TExecutor::Handle(TEvUpdateTaskEnabled::TPtr& ev) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "start task"; + Register(new TUpdateTaskEnabledActor(Controller, ev->Get()->GetTaskId(), ev->Get()->GetEnabled(), ev->Sender)); +} + +void TExecutor::RegisterState() { + Controller = std::make_shared<TExecutorController>(SelfId(), Config); + Become(&TExecutor::StateMain); +} + +void TExecutor::OnInitialized() { + Sender<TEvStartAssign>().SendTo(SelfId()); + Schedule(Config.GetPingPeriod(), new TEvLockPingerStart); +} + +NActors::IActor* CreateService(const TConfig& config) { + return new TExecutor(config); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/executor.h b/ydb/services/bg_tasks/ds_table/executor.h new file mode 100644 index 00000000000..1a07c093991 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/executor.h @@ -0,0 +1,102 @@ +#pragma once +#include "config.h" +#include "executor_controller.h" + +#include <ydb/services/bg_tasks/abstract/common.h> +#include <ydb/services/bg_tasks/abstract/task.h> +#include <ydb/services/bg_tasks/service.h> +#include <ydb/services/metadata/initializer/accessor_init.h> + +namespace NKikimr::NBackgroundTasks { + +class TEvLockPingerFinished: public TEventLocal<TEvLockPingerFinished, EEvents::EvLockPingerFinished> { + +}; + +class TEvLockPingerStart: public TEventLocal<TEvLockPingerStart, EEvents::EvLockPingerStart> { + +}; + +class TEvStartAssign: public TEventLocal<TEvStartAssign, EEvents::EvStartAssign> { + +}; + +class TEvAssignFinished: public TEventLocal<TEvAssignFinished, EEvents::EvAssignFinished> { + +}; + +class TEvFetchingFinished: public TEventLocal<TEvFetchingFinished, EEvents::EvFetchingFinished> { + +}; + +class TEvTaskFetched: public TEventLocal<TEvTaskFetched, EEvents::EvTaskFetched> { +private: + YDB_READONLY_DEF(TTask, Task); +public: + TEvTaskFetched(const TTask& task) + : Task(task) + { + + } +}; + +class TEvTaskExecutorFinished: public TEventLocal<TEvTaskExecutorFinished, EEvents::EvTaskExecutorFinished> { +private: + YDB_READONLY_DEF(TString, TaskId); +public: + TEvTaskExecutorFinished(const TString& taskId) + : TaskId(taskId) + { + + } +}; + +class TExecutor: public NMetadataProvider::TDSAccessorInitialized { +private: + using TBase = NMetadataProvider::TDSAccessorInitialized; + TString TableName; + const TString ExecutorId = TGUID::CreateTimebased().AsUuidString(); + const TConfig Config; + std::set<TString> CurrentTaskIds; + TExecutorController::TPtr Controller; +protected: + virtual void RegisterState() override; + virtual void OnInitialized() override; + virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const override; + + void Handle(TEvStartAssign::TPtr& ev); + void Handle(TEvAssignFinished::TPtr& ev); + void Handle(TEvFetchingFinished::TPtr& ev); + void Handle(TEvTaskFetched::TPtr& ev); + void Handle(TEvTaskExecutorFinished::TPtr& ev); + void Handle(TEvAddTask::TPtr& ev); + void Handle(TEvUpdateTaskEnabled::TPtr& ev); + void Handle(TEvLockPingerStart::TPtr& ev); + void Handle(TEvLockPingerFinished::TPtr& ev); + + STFUNC(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvStartAssign, Handle); + hFunc(TEvAssignFinished, Handle); + hFunc(TEvFetchingFinished, Handle); + hFunc(TEvTaskFetched, Handle); + hFunc(TEvAddTask, Handle); + hFunc(TEvTaskExecutorFinished, Handle); + hFunc(TEvLockPingerStart, Handle); + hFunc(TEvLockPingerFinished, Handle); + default: + TBase::StateMain(ev, ctx); + } + } + +public: + TExecutor(const TConfig& config) + : TBase(config.GetRequestConfig()) + , Config(config) + { + } +}; + +IActor* CreateService(const TConfig& config); + +} diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.cpp b/ydb/services/bg_tasks/ds_table/executor_controller.cpp new file mode 100644 index 00000000000..14f9a9ac691 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/executor_controller.cpp @@ -0,0 +1,30 @@ +#include "executor.h" +#include "executor_controller.h" + +#include <ydb/services/bg_tasks/abstract/task.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NKikimr::NBackgroundTasks { + +void TExecutorController::AssignFinished() const { + ExecutorActorId.Send(ExecutorActorId, new TEvAssignFinished()); +} + +void TExecutorController::FetchingFinished() const { + ExecutorActorId.Send(ExecutorActorId, new TEvFetchingFinished()); +} + +void TExecutorController::TaskFetched(const TTask& task) const { + ExecutorActorId.Send(ExecutorActorId, new TEvTaskFetched(task)); +} + +void TExecutorController::TaskFinished(const TString& taskId) const { + ExecutorActorId.Send(ExecutorActorId, new TEvTaskExecutorFinished(taskId)); +} + +void TExecutorController::LockPingerFinished() const { + ExecutorActorId.Send(ExecutorActorId, new TEvLockPingerFinished()); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.h b/ydb/services/bg_tasks/ds_table/executor_controller.h new file mode 100644 index 00000000000..3d3aebb9cd3 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/executor_controller.h @@ -0,0 +1,39 @@ +#pragma once +#include "config.h" + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actorid.h> + +namespace NKikimr::NBackgroundTasks { + +class TTask; + +class TExecutorController { +private: + const NActors::TActorIdentity ExecutorActorId; + YDB_READONLY_DEF(TConfig, Config); +public: + using TPtr = std::shared_ptr<TExecutorController>; + TExecutorController(const NActors::TActorIdentity& executorActorId, const TConfig& config) + : ExecutorActorId(executorActorId) + , Config(config) + { + + } + + TString GetTableName() const { + return Config.GetTablePath(); + } + + const NInternal::NRequest::TConfig& GetRequestConfig() const { + return Config.GetRequestConfig(); + } + + void LockPingerFinished() const; + void TaskFetched(const TTask& task) const; + void TaskFinished(const TString& taskId) const; + void AssignFinished() const; + void FetchingFinished() const; +}; + +} diff --git a/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp b/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp new file mode 100644 index 00000000000..875ab7aaff1 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp @@ -0,0 +1,63 @@ +#include "fetch_tasks.h" + +#include <ydb/services/bg_tasks/abstract/task.h> + +namespace NKikimr::NBackgroundTasks { + +void TFetchTasksActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& currentFullReply) { + Ydb::Table::ExecuteQueryResult qResult; + currentFullReply.operation().result().UnpackTo(&qResult); + Y_VERIFY((size_t)qResult.result_sets().size() == 1); + TTask::TDecoder decoder(qResult.result_sets()[0]); + std::vector<TTask> newTasks; + for (auto&& i : qResult.result_sets()[0].rows()) { + TTask task; + if (!task.DeserializeFromRecord(decoder, i)) { + ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse task record"; + continue; + } + Controller->TaskFetched(task); + } + Controller->FetchingFinished(); +} + +std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TFetchTasksActor::OnSessionId(const TString& sessionId) { + Ydb::Table::ExecuteDataQueryRequest request; + TStringBuilder sb; + sb << "DECLARE $executorId AS String;" << Endl; + sb << "DECLARE $lastPingCriticalBorder AS Uint32;" << Endl; + if (CurrentTaskIds.size()) { + sb << "DECLARE $taskIds AS List<String>;" << Endl; + } + sb << "SELECT * FROM `" + Controller->GetTableName() + "`" << Endl; + sb << "WHERE executorId = $executorId" << Endl; + sb << "AND enabled = true" << Endl; + sb << "AND lastPing > $lastPingCriticalBorder" << Endl; + if (CurrentTaskIds.size()) { + sb << " AND id NOT IN ($taskIds)" << Endl; + auto& idStrings = (*request.mutable_parameters())["$taskIds"]; + idStrings.mutable_type()->mutable_list_type(); + for (auto&& i : CurrentTaskIds) { + auto* idString = idStrings.mutable_value()->add_items(); + idString->set_bytes_value(i); + } + } + + { + auto& param = (*request.mutable_parameters())["$executorId"]; + param.mutable_value()->set_bytes_value(ExecutorId); + param.mutable_type()->set_type_id(Ydb::Type::STRING); + } + { + auto& param = (*request.mutable_parameters())["$lastPingCriticalBorder"]; + param.mutable_value()->set_uint32_value((TActivationContext::Now() - Controller->GetConfig().GetPingCheckPeriod()).Seconds()); + param.mutable_type()->set_type_id(Ydb::Type::UINT32); + } + request.mutable_query()->set_yql_text(sb); + request.set_session_id(sessionId); + request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only(); + + return request; +} + +} diff --git a/ydb/services/bg_tasks/ds_table/fetch_tasks.h b/ydb/services/bg_tasks/ds_table/fetch_tasks.h new file mode 100644 index 00000000000..0eea7a2646a --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/fetch_tasks.h @@ -0,0 +1,29 @@ +#pragma once +#include "executor_controller.h" + +#include <ydb/services/metadata/request/request_actor.h> + +namespace NKikimr::NBackgroundTasks { + +class TFetchTasksActor: public NInternal::NRequest::TSessionedActor { +private: + using TBase = NInternal::NRequest::TSessionedActor; + const std::set<TString> CurrentTaskIds; + const TString ExecutorId; + TExecutorController::TPtr Controller; +protected: + virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override; + virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override; + +public: + TFetchTasksActor(const std::set<TString>& currentTaskIds, const TString& executorId, + TExecutorController::TPtr controller) + : TBase(controller->GetRequestConfig()) + , CurrentTaskIds(currentTaskIds) + , ExecutorId(executorId) + , Controller(controller) + { + + } +}; +} diff --git a/ydb/services/bg_tasks/ds_table/finish_task.cpp b/ydb/services/bg_tasks/ds_table/finish_task.cpp new file mode 100644 index 00000000000..874fa90d373 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/finish_task.cpp @@ -0,0 +1,27 @@ +#include "finish_task.h" + +namespace NKikimr::NBackgroundTasks { + +std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TDropTaskActor::OnSessionId(const TString& sessionId) { + Ydb::Table::ExecuteDataQueryRequest request; + TStringBuilder sb; + sb << "DECLARE $taskId AS String;" << Endl; + sb << "DELETE FROM `" + Controller->GetTableName() + "` ON SELECT $taskId AS id"; + request.mutable_query()->set_yql_text(sb); + + auto& idString = (*request.mutable_parameters())["$taskId"]; + idString.mutable_value()->set_bytes_value(TaskId); + idString.mutable_type()->set_type_id(Ydb::Type::STRING); + + request.set_session_id(sessionId); + request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); + request.mutable_tx_control()->set_commit_tx(true); + + return request; +} + +void TDropTaskActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*result*/) { + Controller->TaskFinished(TaskId); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/finish_task.h b/ydb/services/bg_tasks/ds_table/finish_task.h new file mode 100644 index 00000000000..2b5020d7774 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/finish_task.h @@ -0,0 +1,26 @@ +#pragma once +#include "executor_controller.h" + +#include <ydb/services/metadata/request/request_actor.h> + +namespace NKikimr::NBackgroundTasks { + +class TDropTaskActor: public NInternal::NRequest::TSessionedActor { +private: + using TBase = NInternal::NRequest::TSessionedActor; + const TString TaskId; + TExecutorController::TPtr Controller; +protected: + virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override; + virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override; + +public: + + TDropTaskActor(const TString& taskId, TExecutorController::TPtr controller) + : TBase(controller->GetRequestConfig()) + , TaskId(taskId) + , Controller(controller) { + + } +}; +} diff --git a/ydb/services/bg_tasks/ds_table/interrupt.cpp b/ydb/services/bg_tasks/ds_table/interrupt.cpp new file mode 100644 index 00000000000..909c9b2e56c --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/interrupt.cpp @@ -0,0 +1,44 @@ +#include "interrupt.h" + +namespace NKikimr::NBackgroundTasks { + +std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TInterruptTaskActor::OnSessionId(const TString& sessionId) { + Ydb::Table::ExecuteDataQueryRequest request; + TStringBuilder sb; + sb << "DECLARE $taskId AS String;" << Endl; + sb << "DECLARE $stateString AS String;" << Endl; + sb << "DECLARE $startInstant AS Uint32;" << Endl; + sb << "UPDATE `" + ExecutorController->GetTableName() + "`" << Endl; + sb << "SET lastPing = 0" << Endl; + sb << ", executorId = null" << Endl; + sb << ", startInstant = $startInstant" << Endl; + sb << ", state = $stateString" << Endl; + sb << "WHERE id = $taskId" << Endl; + request.mutable_query()->set_yql_text(sb); + + { + auto& param = (*request.mutable_parameters())["$startInstant"]; + param.mutable_value()->set_uint32_value(NextStartInstant.Seconds()); + param.mutable_type()->set_type_id(Ydb::Type::UINT32); + } + + auto& idString = (*request.mutable_parameters())["$taskId"]; + idString.mutable_value()->set_bytes_value(TaskId); + idString.mutable_type()->set_type_id(Ydb::Type::STRING); + + auto& sString = (*request.mutable_parameters())["$stateString"]; + sString.mutable_value()->set_bytes_value(State.SerializeToString()); + sString.mutable_type()->set_type_id(Ydb::Type::STRING); + + request.set_session_id(sessionId); + request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); + request.mutable_tx_control()->set_commit_tx(true); + + return request; +} + +void TInterruptTaskActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*result*/) { + ExecutorController->TaskFinished(TaskId); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/interrupt.h b/ydb/services/bg_tasks/ds_table/interrupt.h new file mode 100644 index 00000000000..a48008f00ae --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/interrupt.h @@ -0,0 +1,40 @@ +#pragma once +#include "executor_controller.h" + +#include <ydb/services/bg_tasks/abstract/state.h> +#include <ydb/services/metadata/request/request_actor.h> + +namespace NKikimr::NBackgroundTasks { + +class TInterruptTaskActor: public NInternal::NRequest::TSessionedActor { +private: + using TBase = NInternal::NRequest::TSessionedActor; + TExecutorController::TPtr ExecutorController; + const TString TaskId; + const TInstant NextStartInstant; + TTaskStateContainer State; +protected: + virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override; + virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override; + +public: + STFUNC(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogYQLRequest>, Handle); + default: + TBase::StateMain(ev, ctx); + } + } + + TInterruptTaskActor(TExecutorController::TPtr executorController, + const TString& taskId, const TInstant nextStartInstant, TTaskStateContainer state) + : TBase(executorController->GetRequestConfig()) + , ExecutorController(executorController) + , TaskId(taskId) + , NextStartInstant(nextStartInstant) + , State(state) + { + + } +}; +} diff --git a/ydb/services/bg_tasks/ds_table/lock_pinger.cpp b/ydb/services/bg_tasks/ds_table/lock_pinger.cpp new file mode 100644 index 00000000000..17b0be008a8 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/lock_pinger.cpp @@ -0,0 +1,42 @@ +#include "lock_pinger.h" + +#include <ydb/services/bg_tasks/service.h> + +namespace NKikimr::NBackgroundTasks { + +std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TLockPingerActor::OnSessionId(const TString& sessionId) { + Ydb::Table::ExecuteDataQueryRequest request; + TStringBuilder sb; + const auto now = TActivationContext::Now(); + sb << "DECLARE $taskIds AS List<String>;" << Endl; + sb << "DECLARE $lastPingNewValue AS Uint32;" << Endl; + sb << "UPDATE `" + ExecutorController->GetTableName() + "`" << Endl; + sb << "SET lastPing = $lastPingNewValue" << Endl; + sb << "WHERE id IN ($taskIds)" << Endl; + request.mutable_query()->set_yql_text(sb); + + { + auto& param = (*request.mutable_parameters())["$lastPingNewValue"]; + param.mutable_value()->set_uint32_value(now.Seconds()); + param.mutable_type()->set_type_id(Ydb::Type::UINT32); + } + + auto& idStrings = (*request.mutable_parameters())["$taskIds"]; + idStrings.mutable_type()->mutable_list_type(); + for (auto&& i : TaskIds) { + auto* idString = idStrings.mutable_value()->add_items(); + idString->set_bytes_value(i); + } + + request.set_session_id(sessionId); + request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); + request.mutable_tx_control()->set_commit_tx(true); + + return request; +} + +void TLockPingerActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*ev*/) { + ExecutorController->LockPingerFinished(); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/lock_pinger.h b/ydb/services/bg_tasks/ds_table/lock_pinger.h new file mode 100644 index 00000000000..8c47f7b3f70 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/lock_pinger.h @@ -0,0 +1,25 @@ +#pragma once +#include "executor_controller.h" + +#include <ydb/services/bg_tasks/abstract/state.h> +#include <ydb/services/metadata/request/request_actor.h> + +namespace NKikimr::NBackgroundTasks { + +class TLockPingerActor: public NInternal::NRequest::TSessionedActor { +private: + using TBase = NInternal::NRequest::TSessionedActor; + const std::set<TString> TaskIds; + TExecutorController::TPtr ExecutorController; +protected: + virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override; + virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override; +public: + TLockPingerActor(TExecutorController::TPtr executorController, const std::set<TString>& taskIds) + : TBase(executorController->GetRequestConfig()) + , TaskIds(taskIds) + , ExecutorController(executorController) { + Y_VERIFY(TaskIds.size()); + } +}; +} diff --git a/ydb/services/bg_tasks/ds_table/task_enabled.cpp b/ydb/services/bg_tasks/ds_table/task_enabled.cpp new file mode 100644 index 00000000000..f56a983eaf0 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/task_enabled.cpp @@ -0,0 +1,31 @@ +#include "task_enabled.h" + +#include <ydb/services/bg_tasks/service.h> + +namespace NKikimr::NBackgroundTasks { + +std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> TUpdateTaskEnabledActor::OnSessionId(const TString& sessionId) { + Ydb::Table::ExecuteDataQueryRequest request; + TStringBuilder sb; + sb << "DECLARE $taskId AS String;" << Endl; + sb << "UPDATE `" + ExecutorController->GetTableName() + "`" << Endl; + sb << "SET enabled = " << Enabled << Endl; + sb << "WHERE id = $taskId" << Endl; + request.mutable_query()->set_yql_text(sb); + + auto& idString = (*request.mutable_parameters())["$taskId"]; + idString.mutable_value()->set_bytes_value(TaskId); + idString.mutable_type()->set_type_id(Ydb::Type::STRING); + + request.set_session_id(sessionId); + request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); + request.mutable_tx_control()->set_commit_tx(true); + + return request; +} + +void TUpdateTaskEnabledActor::OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& /*result*/) { + Sender<TEvUpdateTaskEnabledResult>(TaskId, true).SendTo(ResultWaiter); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/task_enabled.h b/ydb/services/bg_tasks/ds_table/task_enabled.h new file mode 100644 index 00000000000..776d561079e --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/task_enabled.h @@ -0,0 +1,32 @@ +#pragma once +#include "executor_controller.h" + +#include <ydb/services/bg_tasks/abstract/state.h> +#include <ydb/services/metadata/request/request_actor.h> + +namespace NKikimr::NBackgroundTasks { + +class TUpdateTaskEnabledActor: public NInternal::NRequest::TSessionedActor { +private: + using TBase = NInternal::NRequest::TSessionedActor; + TExecutorController::TPtr ExecutorController; + const TString TaskId; + const bool Enabled = false; + const TActorId ResultWaiter; +protected: + virtual void OnResult(const NInternal::NRequest::TDialogYQLRequest::TResponse& result) override; + virtual std::optional<NInternal::NRequest::TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override; + +public: + TUpdateTaskEnabledActor(TExecutorController::TPtr executorController, + const TString& taskId, const bool enabled, const TActorId& resultWaiter) + : TBase(executorController->GetRequestConfig()) + , ExecutorController(executorController) + , TaskId(taskId) + , Enabled(enabled) + , ResultWaiter(resultWaiter) + { + + } +}; +} diff --git a/ydb/services/bg_tasks/ds_table/task_executor.cpp b/ydb/services/bg_tasks/ds_table/task_executor.cpp new file mode 100644 index 00000000000..219986f8a61 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/task_executor.cpp @@ -0,0 +1,31 @@ +#include "task_executor.h" +#include "interrupt.h" +#include "finish_task.h" + +namespace NKikimr::NBackgroundTasks { + +void TTaskExecutor::Handle(TEvTaskFinished::TPtr& /*ev*/) { + Register(new TDropTaskActor(Task.GetId(), OwnerController)); + Task.Finished(); + PassAway(); +} + +void TTaskExecutor::Handle(TEvTaskInterrupted::TPtr& ev) { + auto nextStartInstant = Task.GetScheduler().GetNextStartInstant(Task.GetStartInstant(), ev->Get()->GetTaskState()); + if (nextStartInstant) { + Register(new TInterruptTaskActor(OwnerController, Task.GetId(), + *nextStartInstant, ev->Get()->GetTaskState())); + } else { + Register(new TDropTaskActor(Task.GetId(), OwnerController)); + Task.Finished(); + } + PassAway(); +} + +void TTaskExecutor::Bootstrap() { + Become(&TTaskExecutor::StateMain); + Controller = std::make_shared<TTaskExecutorController>(SelfId(), Task.GetId(), OwnerController->GetRequestConfig()); + Task.Execute(Controller); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/task_executor.h b/ydb/services/bg_tasks/ds_table/task_executor.h new file mode 100644 index 00000000000..29f8afdcbc3 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/task_executor.h @@ -0,0 +1,54 @@ +#pragma once +#include "executor_controller.h" + +#include <ydb/services/bg_tasks/abstract/common.h> +#include <ydb/services/bg_tasks/abstract/task.h> +#include <ydb/services/bg_tasks/ds_table/task_executor_controller.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NBackgroundTasks { + +class TEvTaskFinished: public TEventLocal<TEvTaskFinished, EEvents::EvTaskFinished> { +}; + +class TEvTaskInterrupted: public TEventLocal<TEvTaskInterrupted, EEvents::EvTaskInterrupted> { +private: + YDB_READONLY_DEF(ITaskState::TPtr, TaskState); +public: + TEvTaskInterrupted(ITaskState::TPtr taskState) + : TaskState(taskState) + { + + } +}; + +class TTaskExecutor: public NActors::TActorBootstrapped<TTaskExecutor> { +private: + TExecutorController::TPtr OwnerController; + TTaskExecutorController::TPtr Controller; + TTask Task; +public: + TTaskExecutor(const TTask& task, TExecutorController::TPtr controller) + : OwnerController(controller) + , Task(task) + { + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTaskFinished, Handle); + hFunc(TEvTaskInterrupted, Handle); + default: + break; + } + } + + void Bootstrap(); + + void Handle(TEvTaskFinished::TPtr& /*ev*/); + + void Handle(TEvTaskInterrupted::TPtr& ev); +}; + +} diff --git a/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp b/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp new file mode 100644 index 00000000000..68255ef5175 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp @@ -0,0 +1,15 @@ +#include "task_executor.h" +#include "task_executor_controller.h" +#include <ydb/services/bg_tasks/abstract/task.h> + +namespace NKikimr::NBackgroundTasks { + +void TTaskExecutorController::DoTaskInterrupted(ITaskState::TPtr actualTask) { + TaskExecutorId.Send(TaskExecutorId, new TEvTaskInterrupted(actualTask)); +} + +void TTaskExecutorController::DoTaskFinished() { + TaskExecutorId.Send(TaskExecutorId, new TEvTaskFinished()); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/task_executor_controller.h b/ydb/services/bg_tasks/ds_table/task_executor_controller.h new file mode 100644 index 00000000000..2542d0d296e --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/task_executor_controller.h @@ -0,0 +1,28 @@ +#pragma once +#include <ydb/services/metadata/request/config.h> +#include <ydb/services/bg_tasks/abstract/task.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NKikimr::NBackgroundTasks { + +class TTaskExecutorController: public ITaskExecutorController { +private: + const NActors::TActorIdentity TaskExecutorId; + YDB_READONLY_DEF(TString, TaskId); + YDB_READONLY_DEF(NInternal::NRequest::TConfig, RequestConfig); +protected: + virtual void DoTaskInterrupted(ITaskState::TPtr actualTask) override; + virtual void DoTaskFinished() override; +public: + TTaskExecutorController(const NActors::TActorIdentity& executorId, + const TString& taskId, const NInternal::NRequest::TConfig& requestConfig) + : TaskExecutorId(executorId) + , TaskId(taskId) + , RequestConfig(requestConfig) { + + } + +}; + +} diff --git a/ydb/services/bg_tasks/protos/CMakeLists.txt b/ydb/services/bg_tasks/protos/CMakeLists.txt new file mode 100644 index 00000000000..94a58a1da93 --- /dev/null +++ b/ydb/services/bg_tasks/protos/CMakeLists.txt @@ -0,0 +1,31 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-bg_tasks-protos) +target_link_libraries(services-bg_tasks-protos PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_proto_messages(services-bg_tasks-protos PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/protos/container.proto +) +target_proto_addincls(services-bg_tasks-protos + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(services-bg_tasks-protos + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/ydb/services/bg_tasks/protos/container.proto b/ydb/services/bg_tasks/protos/container.proto new file mode 100644 index 00000000000..c33822b77d4 --- /dev/null +++ b/ydb/services/bg_tasks/protos/container.proto @@ -0,0 +1,6 @@ +package NKikimrProto; + +message TStringContainer { + optional string ClassName = 1; + optional bytes BinaryData = 2; +} diff --git a/ydb/services/bg_tasks/service.cpp b/ydb/services/bg_tasks/service.cpp new file mode 100644 index 00000000000..9c1b0dec5b0 --- /dev/null +++ b/ydb/services/bg_tasks/service.cpp @@ -0,0 +1,9 @@ +#include "service.h" + +namespace NKikimr::NBackgroundTasks { + +NActors::TActorId MakeServiceId(const ui32 nodeId) { + return NActors::TActorId(nodeId, "SrvcBgrdTask"); +} + +} diff --git a/ydb/services/bg_tasks/service.h b/ydb/services/bg_tasks/service.h new file mode 100644 index 00000000000..0a022c60a56 --- /dev/null +++ b/ydb/services/bg_tasks/service.h @@ -0,0 +1,75 @@ +#pragma once + +#include <ydb/services/bg_tasks/abstract/common.h> +#include <ydb/services/bg_tasks/abstract/task.h> + +#include <library/cpp/actors/core/event_local.h> + +namespace NKikimr::NBackgroundTasks { + +class TEvAddTask: public NActors::TEventLocal<TEvAddTask, EEvents::EvAddTask> { +private: + YDB_READONLY_DEF(TTask, Task); +public: + TEvAddTask(TTask&& task) + : Task(std::move(task)) { + Y_VERIFY(!!Task.GetActivity()); + } +}; + +template <class TDerived, const ui32 evType> +class TEvCommonResult: public NActors::TEventLocal<TDerived, evType> { +private: + YDB_READONLY_DEF(TString, TaskId); + YDB_READONLY_FLAG(Success, false); + YDB_ACCESSOR_DEF(TString, Report); +public: + TEvCommonResult(const TString& taskId, const bool success, const TString& report = Default<TString>()) + : TaskId(taskId) + , SuccessFlag(success) + , Report(report) + { + Y_VERIFY(!!TaskId); + } + + TString GetDebugString() const { + TStringBuilder sb; + sb << "id=" << TaskId << ";"; + sb << "success=" << SuccessFlag << ";"; + if (!SuccessFlag) { + sb << "report=" << Report << ";"; + } + return sb; + } +}; + +class TEvAddTaskResult: public TEvCommonResult<TEvAddTaskResult, EEvents::EvAddTaskResult> { +private: + using TBase = TEvCommonResult<TEvAddTaskResult, EEvents::EvAddTaskResult>; +public: + using TBase::TBase; +}; + +class TEvUpdateTaskEnabled: public NActors::TEventLocal<TEvUpdateTaskEnabled, EEvents::EvUpdateTaskEnabled> { +private: + YDB_READONLY_DEF(TString, TaskId); + YDB_ACCESSOR(bool, Enabled, true); +public: + TEvUpdateTaskEnabled(const TString& taskId, const bool enabled) + : TaskId(taskId) + , Enabled(enabled) + { + Y_VERIFY(!!TaskId); + } +}; + +class TEvUpdateTaskEnabledResult: public TEvCommonResult<TEvUpdateTaskEnabledResult, EEvents::EvUpdateTaskEnabledResult> { +private: + using TBase = TEvCommonResult<TEvUpdateTaskEnabledResult, EEvents::EvUpdateTaskEnabledResult>; +public: + using TBase::TBase; +}; + +NActors::TActorId MakeServiceId(const ui32 nodeId); + +} diff --git a/ydb/services/bg_tasks/ut/CMakeLists.darwin.txt b/ydb/services/bg_tasks/ut/CMakeLists.darwin.txt new file mode 100644 index 00000000000..74df8ceb913 --- /dev/null +++ b/ydb/services/bg_tasks/ut/CMakeLists.darwin.txt @@ -0,0 +1,57 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-bg_tasks-ut) +target_compile_options(ydb-services-bg_tasks-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-bg_tasks-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks +) +target_link_libraries(ydb-services-bg_tasks-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-bg_tasks + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + ydb-core-testlib + public-lib-yson_value + cpp-client-ydb_table + services-bg_tasks-abstract + cpp-testing-unittest + yql-parser-pg_wrapper + yql-sql-pg +) +target_link_options(ydb-services-bg_tasks-ut PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-services-bg_tasks-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/ut_tasks.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/task_emulator.cpp +) +add_test( + NAME + ydb-services-bg_tasks-ut + COMMAND + ydb-services-bg_tasks-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-services-bg_tasks-ut) diff --git a/ydb/services/bg_tasks/ut/CMakeLists.linux-aarch64.txt b/ydb/services/bg_tasks/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..266c1f57e62 --- /dev/null +++ b/ydb/services/bg_tasks/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,59 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-bg_tasks-ut) +target_compile_options(ydb-services-bg_tasks-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-bg_tasks-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks +) +target_link_libraries(ydb-services-bg_tasks-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-testing-unittest_main + ydb-services-bg_tasks + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + ydb-core-testlib + public-lib-yson_value + cpp-client-ydb_table + services-bg_tasks-abstract + cpp-testing-unittest + yql-parser-pg_wrapper + yql-sql-pg +) +target_link_options(ydb-services-bg_tasks-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-bg_tasks-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/ut_tasks.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/task_emulator.cpp +) +add_test( + NAME + ydb-services-bg_tasks-ut + COMMAND + ydb-services-bg_tasks-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-services-bg_tasks-ut) diff --git a/ydb/services/bg_tasks/ut/CMakeLists.linux.txt b/ydb/services/bg_tasks/ut/CMakeLists.linux.txt new file mode 100644 index 00000000000..f3fe126c7d1 --- /dev/null +++ b/ydb/services/bg_tasks/ut/CMakeLists.linux.txt @@ -0,0 +1,61 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-bg_tasks-ut) +target_compile_options(ydb-services-bg_tasks-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-bg_tasks-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks +) +target_link_libraries(ydb-services-bg_tasks-ut PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-bg_tasks + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + ydb-core-testlib + public-lib-yson_value + cpp-client-ydb_table + services-bg_tasks-abstract + cpp-testing-unittest + yql-parser-pg_wrapper + yql-sql-pg +) +target_link_options(ydb-services-bg_tasks-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-bg_tasks-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/ut_tasks.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ut/task_emulator.cpp +) +add_test( + NAME + ydb-services-bg_tasks-ut + COMMAND + ydb-services-bg_tasks-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-services-bg_tasks-ut) diff --git a/ydb/services/bg_tasks/ut/CMakeLists.txt b/ydb/services/bg_tasks/ut/CMakeLists.txt new file mode 100644 index 00000000000..3e0811fb22e --- /dev/null +++ b/ydb/services/bg_tasks/ut/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/services/bg_tasks/ut/task_emulator.cpp b/ydb/services/bg_tasks/ut/task_emulator.cpp new file mode 100644 index 00000000000..ccf05e7f5a7 --- /dev/null +++ b/ydb/services/bg_tasks/ut/task_emulator.cpp @@ -0,0 +1,50 @@ +#include "task_emulator.h" + +namespace NKikimr { + +TTestInsertTaskActivity::TFactory::TRegistrator<TTestInsertTaskActivity> TTestInsertTaskActivity::Registrator(TTestInsertTaskState::GetClassNameStatic()); +TTestInsertTaskScheduler::TFactory::TRegistrator<TTestInsertTaskScheduler> TTestInsertTaskScheduler::Registrator(TTestInsertTaskState::GetClassNameStatic()); +TTestInsertTaskState::TFactory::TRegistrator<TTestInsertTaskState> TTestInsertTaskState::Registrator(TTestInsertTaskState::GetClassNameStatic()); + +namespace { +TMutex Mutex; +std::map<TString, ui32> CounterSumByActivityId; +std::map<TString, bool> FinishedByActivityId; +} + +void TTestInsertTaskActivity::DoFinished(const NBackgroundTasks::TTaskStateContainer& /*state*/) { + TGuard<TMutex> g(Mutex); + FinishedByActivityId[ActivityTaskId] = true; +} + +void TTestInsertTaskActivity::DoExecute(NBackgroundTasks::ITaskExecutorController::TPtr controller, const NBackgroundTasks::TTaskStateContainer& currentState) { + TGuard<TMutex> g(Mutex); + const ui32 c = currentState.HasObject() ? + currentState.GetAsSafe<TTestInsertTaskState>().GetCounter() : + 0; + Cerr << "TASK EXECUTED: " << c << Endl; + CounterSumByActivityId[ActivityTaskId] += c; + controller->TaskInterrupted(std::make_shared<TTestInsertTaskState>(c + 1)); +} + +bool TTestInsertTaskActivity::IsFinished(const TString& id) { + TGuard<TMutex> g(Mutex); + auto it = FinishedByActivityId.find(id); + if (it == FinishedByActivityId.end()) { + return false; + } else { + return it->second; + } +} + +ui32 TTestInsertTaskActivity::GetCounterSum(const TString& id) { + TGuard<TMutex> g(Mutex); + auto it = CounterSumByActivityId.find(id); + if (it == CounterSumByActivityId.end()) { + return 0; + } else { + return it->second; + } +} + +} diff --git a/ydb/services/bg_tasks/ut/task_emulator.h b/ydb/services/bg_tasks/ut/task_emulator.h new file mode 100644 index 00000000000..4ca71487667 --- /dev/null +++ b/ydb/services/bg_tasks/ut/task_emulator.h @@ -0,0 +1,101 @@ +#include <ydb/services/bg_tasks/abstract/activity.h> +#include <ydb/services/bg_tasks/abstract/task.h> +#include <ydb/services/bg_tasks/service.h> + +#include <util/system/hostname.h> + +namespace NKikimr { + + class TTestInsertTaskState: public NBackgroundTasks::IJsonStringSerializable<NBackgroundTasks::ITaskState> { + private: + static TFactory::TRegistrator<TTestInsertTaskState> Registrator; + + YDB_ACCESSOR(ui32, Counter, 0); + protected: + virtual NJson::TJsonValue DoSerializeToJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("counter", Counter); + return result; + } + virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonData) override { + Counter = jsonData["counter"].GetUIntegerSafe(); + return true; + } + public: + TTestInsertTaskState() = default; + TTestInsertTaskState(const ui32 counter) + : Counter(counter) + { + + } + static TString GetClassNameStatic() { + return "test_insert"; + } + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + }; + + class TTestInsertTaskScheduler: public NBackgroundTasks::IJsonStringSerializable<NBackgroundTasks::ITaskScheduler> { + private: + static TFactory::TRegistrator<TTestInsertTaskScheduler> Registrator; + + YDB_ACCESSOR(ui32, Limit, 3); + protected: + virtual TInstant DoGetStartInstant() const override { + return TInstant::Zero(); + } + + virtual std::optional<TInstant> DoGetNextStartInstant( + const TInstant /*currentStartInstant*/, const NBackgroundTasks::TTaskStateContainer& state) const override { + if (state.GetAsSafe<TTestInsertTaskState>().GetCounter() <= Limit) { + return TInstant::Zero(); + } else { + return {}; + } + } + + virtual NJson::TJsonValue DoSerializeToJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("limit", Limit); + return result; + } + virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonData) override { + Limit = jsonData["limit"].GetUIntegerSafe(); + return true; + } + public: + TTestInsertTaskScheduler() = default; + virtual TString GetClassName() const override { + return TTestInsertTaskState::GetClassNameStatic(); + } + }; + + class TTestInsertTaskActivity: public NBackgroundTasks::IJsonStringSerializable<NBackgroundTasks::ITaskActivity> { + private: + static TFactory::TRegistrator<TTestInsertTaskActivity> Registrator; + YDB_READONLY(TString, ActivityTaskId, TGUID::CreateTimebased().AsUuidString()); + + protected: + virtual NJson::TJsonValue DoSerializeToJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("id", ActivityTaskId); + return result; + } + virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonData) override { + ActivityTaskId = jsonData["id"].GetStringRobust(); + return true; + } + virtual void DoFinished(const NBackgroundTasks::TTaskStateContainer& /*state*/) override; + + virtual void DoExecute(NBackgroundTasks::ITaskExecutorController::TPtr controller, + const NBackgroundTasks::TTaskStateContainer& currentState) override; + public: + static bool IsFinished(const TString& id); + static ui32 GetCounterSum(const TString& id); + virtual TString GetClassName() const override { + return TTestInsertTaskState::GetClassNameStatic(); + } + }; +} + diff --git a/ydb/services/bg_tasks/ut/ut_tasks.cpp b/ydb/services/bg_tasks/ut/ut_tasks.cpp new file mode 100644 index 00000000000..5e603467fb1 --- /dev/null +++ b/ydb/services/bg_tasks/ut/ut_tasks.cpp @@ -0,0 +1,70 @@ +#include "task_emulator.h" +#include <ydb/core/testlib/test_client.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/services/bg_tasks/abstract/activity.h> +#include <ydb/services/bg_tasks/abstract/task.h> +#include <ydb/services/bg_tasks/service.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/testing/unittest/tests_data.h> +#include <util/system/hostname.h> + +namespace NKikimr { + +Y_UNIT_TEST_SUITE(BGTaskTests) { + Y_UNIT_TEST(DSRunTask) { + TPortManager pm; + + ui32 grpcPort = pm.GetPort(); + ui32 msgbPort = pm.GetPort(); + + Tests::TServerSettings serverSettings(msgbPort); + serverSettings.Port = msgbPort; + serverSettings.GrpcPort = grpcPort; + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMetadataProvider(false) + .SetEnableBackgroundTasks(true) + .SetEnableOlapSchemaOperations(true); + ; + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + server->EnableGRpc(grpcPort); + Tests::TClient client(serverSettings); + + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE); + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_INFO); + runtime.SetLogPriority(NKikimrServices::METADATA_PROVIDER, NLog::PRI_INFO); + runtime.SetLogPriority(NKikimrServices::BG_TASKS, NLog::PRI_DEBUG); + // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG); + runtime.SimulateSleep(TDuration::Seconds(1)); + Cerr << "Initialization finished" << Endl; + + TString activityId; + { + std::shared_ptr<TTestInsertTaskActivity> tActivity(new TTestInsertTaskActivity); + activityId = tActivity->GetActivityTaskId(); + std::shared_ptr<TTestInsertTaskScheduler> tScheduler(new TTestInsertTaskScheduler); + NBackgroundTasks::TTask newTask(tActivity, tScheduler); + runtime.SendAsync(new IEventHandle(NBackgroundTasks::MakeServiceId(1), {}, new NBackgroundTasks::TEvAddTask(std::move(newTask)))); + } + TDispatchOptions rmReady; + rmReady.CustomFinalCondition = [activityId] { + Y_VERIFY(TTestInsertTaskActivity::GetCounterSum(activityId) <= 6); + if (TTestInsertTaskActivity::IsFinished(activityId)) { + Y_VERIFY(TTestInsertTaskActivity::GetCounterSum(activityId) == 6); + return true; + } else { + Cerr << "COUNTER_SUM:" << TTestInsertTaskActivity::GetCounterSum(activityId) << Endl; + } + return false; + }; + Y_VERIFY(runtime.DispatchEvents(rmReady, TDuration::Seconds(30))); + } +} +} diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp index 80ded24e461..195c700d419 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.cpp +++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp @@ -18,9 +18,7 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetTables().size()); *CurrentSelection.mutable_result_sets() = std::move(*qResult.mutable_result_sets()); auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(CurrentSelection, RequestedActuality); - if (!!parsedSnapshot) { - CurrentSnapshot = parsedSnapshot; - } else { + if (!parsedSnapshot) { ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot"; } @@ -36,6 +34,8 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { } } ); + } else { + Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); } } @@ -80,9 +80,14 @@ void TDSAccessorRefresher::OnInitialized() { } TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor) - : TBase(config, snapshotConstructor->GetTableSchema()) + : TBase(config.GetRequestConfig()) , SnapshotConstructor(snapshotConstructor) + , Config(config) { } +TVector<NKikimr::NMetadataProvider::ITableModifier::TPtr> TDSAccessorRefresher::BuildModifiers() const { + return SnapshotConstructor->GetTableSchema(); +} + } diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h index 7d883893a83..39da63b99a1 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.h +++ b/ydb/services/metadata/ds_table/accessor_refresh.h @@ -39,12 +39,14 @@ private: YDB_READONLY_DEF(ISnapshot::TPtr, CurrentSnapshot); YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection); TInstant RequestedActuality = TInstant::Zero(); + const TConfig Config; protected: bool IsReady() const { return !!CurrentSnapshot; } virtual void OnInitialized() override; virtual void OnSnapshotModified() = 0; + virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const override; public: using TBase::Handle; diff --git a/ydb/services/metadata/ds_table/config.h b/ydb/services/metadata/ds_table/config.h index 372f446771c..228b3e637b6 100644 --- a/ydb/services/metadata/ds_table/config.h +++ b/ydb/services/metadata/ds_table/config.h @@ -14,7 +14,6 @@ private: public: TConfig() = default; - TDuration GetRetryPeriod(const ui32 retry) const; bool DeserializeFromProto(const NKikimrConfig::TMetadataProviderConfig& config); }; } diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp index 99cf9a5a698..7536ff85f74 100644 --- a/ydb/services/metadata/initializer/accessor_init.cpp +++ b/ydb/services/metadata/initializer/accessor_init.cpp @@ -5,28 +5,36 @@ namespace NKikimr::NMetadataProvider { void TDSAccessorInitialized::Bootstrap() { RegisterState(); - Modifiers.front()->Execute(SelfId(), Config.GetRequestConfig()); + auto modifiers = BuildModifiers(); + for (auto&& i : modifiers) { + Modifiers.emplace_back(i); + } + if (Modifiers.size()) { + ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "modifiers count: " << Modifiers.size(); + Modifiers.front()->Execute(SelfId(), Config); + } else { + ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "initialization finished"; + OnInitialized(); + } } void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPtr& /*ev*/) { + ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "modifiers count: " << Modifiers.size(); if (Modifiers.empty()) { return; } Modifiers.pop_front(); if (Modifiers.size()) { - Modifiers.front()->Execute(SelfId(), Config.GetRequestConfig()); + Modifiers.front()->Execute(SelfId(), Config); } else { + ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "initialization finished"; OnInitialized(); } } -TDSAccessorInitialized::TDSAccessorInitialized(const TConfig& config, const TVector<ITableModifier::TPtr>& modifiers) +TDSAccessorInitialized::TDSAccessorInitialized(const NInternal::NRequest::TConfig& config) : Config(config) { - Y_VERIFY(modifiers.size()); - for (auto&& i : modifiers) { - Modifiers.emplace_back(i); - } } } diff --git a/ydb/services/metadata/initializer/accessor_init.h b/ydb/services/metadata/initializer/accessor_init.h index d278408ae2e..b7086c12fef 100644 --- a/ydb/services/metadata/initializer/accessor_init.h +++ b/ydb/services/metadata/initializer/accessor_init.h @@ -13,13 +13,14 @@ namespace NKikimr::NMetadataProvider { class TDSAccessorInitialized: public NActors::TActorBootstrapped<TDSAccessorInitialized> { private: TDeque<ITableModifier::TPtr> Modifiers; + const NInternal::NRequest::TConfig Config; protected: - const TConfig& Config; virtual void RegisterState() = 0; virtual void OnInitialized() = 0; + virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const = 0; public: void Bootstrap(); - TDSAccessorInitialized(const TConfig& config, const TVector<ITableModifier::TPtr>& modifiers); + TDSAccessorInitialized(const NInternal::NRequest::TConfig& config); void Handle(NInternal::NRequest::TEvRequestFinished::TPtr& ev); STATEFN(StateMain) { diff --git a/ydb/services/metadata/request/common.h b/ydb/services/metadata/request/common.h index eb7100c9389..9fb0a8b5f2b 100644 --- a/ydb/services/metadata/request/common.h +++ b/ydb/services/metadata/request/common.h @@ -18,10 +18,15 @@ enum EEvents { EvSelectRequest, EvSelectInternalResponse, EvSelectResponse, + EvYQLRequest, + EvYQLInternalResponse, + EvGeneralYQLResponse, EvCreateSessionRequest, EvCreateSessionInternalResponse, EvCreateSessionResponse, EvRequestFinished, + EvRequestFailed, + EvRequestStart, EvEnd }; diff --git a/ydb/services/metadata/request/request_actor.h b/ydb/services/metadata/request/request_actor.h index a5bf6064abc..a6cc0a51641 100644 --- a/ydb/services/metadata/request/request_actor.h +++ b/ydb/services/metadata/request/request_actor.h @@ -2,8 +2,6 @@ #include "common.h" #include "config.h" -#include <library/cpp/actors/core/actor_virtual.h> -#include <library/cpp/actors/core/av_bootstrapped.h> #include <library/cpp/actors/core/log.h> #include <ydb/core/base/appdata.h> #include <ydb/core/grpc_services/base/base.h> @@ -29,6 +27,16 @@ using TDialogSelect = TDialogPolicyImpl<Ydb::Table::ExecuteDataQueryRequest, Ydb using TDialogCreateSession = TDialogPolicyImpl<Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse, EEvents::EvCreateSessionRequest, EEvents::EvCreateSessionInternalResponse, EEvents::EvCreateSessionResponse>; +template <ui32 evResult = EEvents::EvGeneralYQLResponse> +using TCustomDialogYQLRequest = TDialogPolicyImpl<Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse, + EEvents::EvYQLRequest, EEvents::EvYQLInternalResponse, evResult>; +template <ui32 evResult = EEvents::EvCreateSessionResponse> +using TCustomDialogCreateSpecialSession = TDialogPolicyImpl<Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse, + EEvents::EvCreateSessionRequest, EEvents::EvCreateSessionInternalResponse, evResult>; + +using TDialogYQLRequest = TCustomDialogYQLRequest<EEvents::EvGeneralYQLResponse>; +using TDialogCreateSpecialSession = TCustomDialogCreateSpecialSession<EEvents::EvCreateSessionResponse>; + template <class TDialogPolicy> class TEvRequestResult: public NActors::TEventLocal<TEvRequestResult<TDialogPolicy>, TDialogPolicy::EvResult> { private: @@ -42,7 +50,14 @@ public: class TEvRequestFinished: public NActors::TEventLocal<TEvRequestFinished, EEvents::EvRequestFinished> { public: - TEvRequestFinished() = default; +}; + +class TEvRequestStart: public NActors::TEventLocal<TEvRequestStart, EEvents::EvRequestStart> { +public: +}; + +class TEvRequestFailed: public NActors::TEventLocal<TEvRequestFailed, EEvents::EvRequestFailed> { +public: }; template <class TResponse> @@ -71,6 +86,7 @@ private: using TSelf = TYDBRequest<TDialogPolicy>; TRequest ProtoRequest; const NActors::TActorId ActorFinishId; + const NActors::TActorId ActorRestartId; const TConfig& Config; ui32 Retry = 0; protected: @@ -83,10 +99,6 @@ protected: } }; - class TEvRequestStart: public NActors::TEventLocal<TEvRequestStart, TDialogPolicy::EvStart> { - public: - }; - public: void Bootstrap(const TActorContext& /*ctx*/) { TBase::Become(&TBase::TThis::StateMain); @@ -104,14 +116,22 @@ public: void Handle(typename TEvRequestInternalResult::TPtr& ev) { if (!ev->Get()->GetFuture().HasValue() || ev->Get()->GetFuture().HasException()) { ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive result on initialization"; - TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart); + if (ActorRestartId) { + TBase::template Sender<TEvRequestFailed>().SendTo(ActorRestartId); + } else { + TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart); + } return; } auto f = ev->Get()->GetFuture(); TResponse response = f.ExtractValue(); if (!TOperatorChecker<TResponse>::IsSuccess(response)) { ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "incorrect reply: " << response.DebugString(); - TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart); + if (ActorRestartId) { + TBase::template Sender<TEvRequestFailed>().SendTo(ActorRestartId); + } else { + TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart); + } return; } TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(ActorFinishId); @@ -131,13 +151,79 @@ public: result.Subscribe(replyCallback); } - TYDBRequest(const TRequest& request, const NActors::TActorId actorFinishId, const TConfig& config) + TYDBRequest(const TRequest& request, const NActors::TActorId actorFinishId, const TConfig& config, const NActors::TActorId& actorRestartId = {}) : ProtoRequest(request) , ActorFinishId(actorFinishId) + , ActorRestartId(actorRestartId) , Config(config) { } }; +template <class TDialogPolicy> +class TSessionedActorImpl: public NActors::TActorBootstrapped<TSessionedActorImpl<TDialogPolicy>> { +private: + ui32 Retry = 0; + const TActorId FinishedActorId; + + static_assert(!std::is_same<TDialogPolicy, TDialogCreateSession>()); + using TBase = NActors::TActorBootstrapped<TSessionedActorImpl<TDialogPolicy>>; + void Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev) { + Ydb::Table::CreateSessionResponse currentFullReply = ev->Get()->GetResult(); + Ydb::Table::CreateSessionResult session; + currentFullReply.operation().result().UnpackTo(&session); + const TString sessionId = session.session_id(); + Y_VERIFY(sessionId); + std::optional<typename TDialogPolicy::TRequest> nextRequest = OnSessionId(sessionId); + Y_VERIFY(nextRequest); + TBase::Register(new TYDBRequest<TDialogPolicy>(*nextRequest, TBase::SelfId(), Config, TBase::SelfId())); + } +protected: + const NInternal::NRequest::TConfig Config; + virtual std::optional<typename TDialogPolicy::TRequest> OnSessionId(const TString& sessionId) = 0; + virtual void OnResult(const typename TDialogPolicy::TResponse& response) = 0; +public: + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvRequestResult<TDialogCreateSession>, Handle); + hFunc(TEvRequestFailed, Handle); + hFunc(TEvRequestStart, Handle); + hFunc(TEvRequestResult<TDialogPolicy>, Handle); + default: + break; + } + } + + TSessionedActorImpl(const NInternal::NRequest::TConfig& config) + : Config(config) + { + + } + + void Handle(typename TEvRequestResult<TDialogPolicy>::TPtr& ev) { + OnResult(ev->Get()->GetResult()); + TBase::PassAway(); + } + + void Handle(typename TEvRequestFailed::TPtr& /*ev*/) { + TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart); + } + + void Handle(typename TEvRequestFinished::TPtr& /*ev*/) { + Retry = 0; + } + + void Handle(typename TEvRequestStart::TPtr& /*ev*/) { + TBase::Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), TBase::SelfId(), Config, TBase::SelfId())); + } + + void Bootstrap() { + TBase::Become(&TSessionedActorImpl::StateMain); + TBase::template Sender<TEvRequestStart>().SendTo(TBase::SelfId()); + } +}; + +using TSessionedActor = TSessionedActorImpl<TDialogYQLRequest>; + } |