diff options
author | beloshabskiy <beloshabskiy@yandex-team.ru> | 2022-02-10 16:51:13 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:51:13 +0300 |
commit | 6b612367cdcddbe5750d116472ea2b621e6b39fa (patch) | |
tree | 7ed9159b401b0c92bfcc0ac0d9bb3504b8144711 | |
parent | b2519d17aba308e7ddee5687eeb85103642988eb (diff) | |
download | ydb-6b612367cdcddbe5750d116472ea2b621e6b39fa.tar.gz |
Restoring authorship annotation for <beloshabskiy@yandex-team.ru>. Commit 1 of 2.
69 files changed, 2014 insertions, 2014 deletions
diff --git a/build/rules/kikimr.policy b/build/rules/kikimr.policy index 5bb6426e30..0a0d85e495 100644 --- a/build/rules/kikimr.policy +++ b/build/rules/kikimr.policy @@ -15,7 +15,7 @@ ALLOW solomon/ -> kikimr/core/protos ALLOW solomon/ -> kikimr/public/lib/deprecated/kicli ALLOW solomon/ -> kikimr/library/mkql_proto/protos ALLOW solomon/ -> kikimr/yq/libs/config/protos -ALLOW solomon/ -> kikimr/library/folder_service/proto +ALLOW solomon/ -> kikimr/library/folder_service/proto ALLOW solomon/ -> kikimr/library/login/protos # temporary (FIXME: gvit) ALLOW yweb/robot/fetcher/generic_actors -> kikimr/core/protos diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index f5fedfe19b..66cef140eb 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -141,7 +141,7 @@ struct TKikimrEvents : TEvents { ES_SEQUENCEPROXY, // 4217 ES_CLOUD_STORAGE, ES_CLOUD_STORAGE_PRIVATE, - ES_FOLDER_SERVICE_ADAPTER, + ES_FOLDER_SERVICE_ADAPTER, ES_PQ_PARTITION_WRITER, ES_YDB_PROXY, ES_REPLICATION_CONTROLLER, diff --git a/ydb/core/driver_lib/run/factories.h b/ydb/core/driver_lib/run/factories.h index 41a45b44e0..5601d7ee5a 100644 --- a/ydb/core/driver_lib/run/factories.h +++ b/ydb/core/driver_lib/run/factories.h @@ -13,7 +13,7 @@ #include <ydb/library/folder_service/proto/config.pb.h> #include <ydb/library/pdisk_io/aio.h> #include <ydb/core/yq/libs/config/protos/audit.pb.h> - + #include <ydb/library/yql/providers/pq/cm_client/interface/client.h> #include <library/cpp/actors/core/actorsystem.h> diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 819c1478d1..6766787795 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -138,7 +138,7 @@ #include <ydb/library/folder_service/folder_service.h> #include <ydb/library/folder_service/proto/config.pb.h> - + #include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> #include <library/cpp/actors/protos/services_common.pb.h> @@ -2084,9 +2084,9 @@ void TLeaseHolderInitializer::InitializeServices(NActors::TActorSystemSetup* set } } -TSqsServiceInitializer::TSqsServiceInitializer(const TKikimrRunConfig& runConfig, const std::shared_ptr<TModuleFactories>& factories) +TSqsServiceInitializer::TSqsServiceInitializer(const TKikimrRunConfig& runConfig, const std::shared_ptr<TModuleFactories>& factories) : IKikimrServicesInitializer(runConfig) - , Factories(factories) + , Factories(factories) { } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 407ce1bb7b..0bc9d47b5e 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -413,12 +413,12 @@ public: class TSqsServiceInitializer : public IKikimrServicesInitializer { public: - TSqsServiceInitializer(const TKikimrRunConfig& runConfig, const std::shared_ptr<TModuleFactories>& factories); + TSqsServiceInitializer(const TKikimrRunConfig& runConfig, const std::shared_ptr<TModuleFactories>& factories); void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; - -private: - std::shared_ptr<TModuleFactories> Factories; + +private: + std::shared_ptr<TModuleFactories> Factories; }; class TConfigsDispatcherInitializer : public IKikimrServicesInitializer { diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index a4f74aa4e0..956b8b3b26 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1277,7 +1277,7 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers } if (serviceMask.EnableSqs) { - sil->AddServiceInitializer(new TSqsServiceInitializer(runConfig, ModuleFactories)); + sil->AddServiceInitializer(new TSqsServiceInitializer(runConfig, ModuleFactories)); } if (serviceMask.EnableConfigsDispatcher) { diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index c17c8a7dc3..59638f817b 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -285,7 +285,7 @@ enum EServiceKikimr { STREAMS_STORAGE_SERVICE = 1013; STREAMS_SCHEDULER_SERVICE = 1014; STREAMS_RESOURCE_SERVICE = 1015; - STREAMS_CHECKPOINT_COORDINATOR = 1016; + STREAMS_CHECKPOINT_COORDINATOR = 1016; STREAMS_CONTROL_PLANE_SERVICE = 1017; STREAMS_GRAND_LEADER_SERVICE = 1018; STREAMS_META_STORAGE_SERVICE = 1019; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index d4907c26f1..2f490afe54 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -810,7 +810,7 @@ namespace Tests { &appData, "TestTenant", nullptr, // MakeIntrusive<NPq::NConfigurationManager::TConnections>(), - yqSharedResources, + yqSharedResources, NKikimr::NFolderService::CreateMockFolderServiceActor, NYq::CreateMockYqAuditServiceActor, ydbCredFactory, diff --git a/ydb/core/ymq/actor/serviceid.h b/ydb/core/ymq/actor/serviceid.h index 7219d918ba..37cb78e9f3 100644 --- a/ydb/core/ymq/actor/serviceid.h +++ b/ydb/core/ymq/actor/serviceid.h @@ -35,7 +35,7 @@ IActor* CreateSqsService(TMaybe<ui32> ydbPort = Nothing()); IActor* CreateSqsProxyService(); IActor* CreateSqsAccessService(const TString& address, const TString& pathToRootCA); IActor* CreateSqsFolderService(const TString& address, const TString& pathToRootCA); -IActor* CreateMockSqsFolderService(); +IActor* CreateMockSqsFolderService(); IActor* CreateSqsMeteringService(); } // namespace NKikimr::NSQS diff --git a/ydb/core/yq/libs/actors/logging/log.h b/ydb/core/yq/libs/actors/logging/log.h index a5a938f014..2478711447 100644 --- a/ydb/core/yq/libs/actors/logging/log.h +++ b/ydb/core/yq/libs/actors/logging/log.h @@ -65,17 +65,17 @@ #define LOG_STREAMS_RESOURCE_SERVICE_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_RESOURCE_SERVICE, logRecordStream) #define LOG_STREAMS_RESOURCE_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_RESOURCE_SERVICE, logRecordStream) #define LOG_STREAMS_RESOURCE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_RESOURCE_SERVICE, logRecordStream) - -// Component: STREAMS_CHECKPOINT_COORDINATOR. -#define LOG_STREAMS_CHECKPOINT_COORDINATOR_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) -#define LOG_STREAMS_CHECKPOINT_COORDINATOR_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) -#define LOG_STREAMS_CHECKPOINT_COORDINATOR_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) -#define LOG_STREAMS_CHECKPOINT_COORDINATOR_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) -#define LOG_STREAMS_CHECKPOINT_COORDINATOR_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) -#define LOG_STREAMS_CHECKPOINT_COORDINATOR_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) -#define LOG_STREAMS_CHECKPOINT_COORDINATOR_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) -#define LOG_STREAMS_CHECKPOINT_COORDINATOR_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) -#define LOG_STREAMS_CHECKPOINT_COORDINATOR_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) + +// Component: STREAMS_CHECKPOINT_COORDINATOR. +#define LOG_STREAMS_CHECKPOINT_COORDINATOR_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) +#define LOG_STREAMS_CHECKPOINT_COORDINATOR_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) +#define LOG_STREAMS_CHECKPOINT_COORDINATOR_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) +#define LOG_STREAMS_CHECKPOINT_COORDINATOR_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) +#define LOG_STREAMS_CHECKPOINT_COORDINATOR_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) +#define LOG_STREAMS_CHECKPOINT_COORDINATOR_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) +#define LOG_STREAMS_CHECKPOINT_COORDINATOR_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) +#define LOG_STREAMS_CHECKPOINT_COORDINATOR_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) +#define LOG_STREAMS_CHECKPOINT_COORDINATOR_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) // Component: STREAMS_CONTROL_PLANE_SERVICE. #define LOG_STREAMS_CONTROL_PLANE_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_CONTROL_PLANE_SERVICE, logRecordStream) diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 5549f8f254..14f2703dd0 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -1127,7 +1127,7 @@ private: // out params: gatewaysConfig, clusters); - + TVector<TDataProviderInitializer> dataProvidersInit; const auto dbResolver = std::make_shared<TDatabaseAsyncResolverWithMeta>(TDatabaseAsyncResolverWithMeta(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver, Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId, Params.AuthToken, Params.AccountIdSignatures, Connections)); @@ -1151,7 +1151,7 @@ private: dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory)); } - { + { NYql::TPqGatewayServices pqServices( Params.Driver, Params.PqCmConnections, @@ -1159,10 +1159,10 @@ private: std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()), Params.FunctionRegistry ); - const auto pqGateway = NYql::CreatePqNativeGateway(pqServices); + const auto pqGateway = NYql::CreatePqNativeGateway(pqServices); dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver)); - } - + } + { auto solomonConfig = gatewaysConfig.GetSolomon(); auto solomonGateway = NYql::CreateSolomonGateway(solomonConfig); diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp index 1446cae1bb..e01ba1ca8f 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.cpp +++ b/ydb/core/yq/libs/actors/run_actor_params.cpp @@ -29,7 +29,7 @@ TRunActorParams::TRunActorParams( TVector<YandexQuery::Connection> connections, TVector<YandexQuery::Binding> bindings, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - THashMap<TString, TString> accountIdSignatures, + THashMap<TString, TString> accountIdSignatures, YandexQuery::QueryContent::QueryType queryType, YandexQuery::ExecuteMode executeMode, const TString& resultId, @@ -71,7 +71,7 @@ TRunActorParams::TRunActorParams( , Bindings(std::move(bindings)) , CredentialsFactory(std::move(credentialsFactory)) , AccountIdSignatures(std::move(accountIdSignatures)) - , QueryType(queryType) + , QueryType(queryType) , ExecuteMode(executeMode) , ResultId(resultId) , StateLoadMode(stateLoadMode) diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h index bad5c3c32b..7a6b7436bd 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.h +++ b/ydb/core/yq/libs/actors/run_actor_params.h @@ -43,7 +43,7 @@ struct TRunActorParams { // TODO2 : Change name TVector<YandexQuery::Connection> connections, TVector<YandexQuery::Binding> bindings, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - THashMap<TString, TString> accountIdSignatures, + THashMap<TString, TString> accountIdSignatures, YandexQuery::QueryContent::QueryType queryType, YandexQuery::ExecuteMode executeMode, const TString& resultId, @@ -90,7 +90,7 @@ struct TRunActorParams { // TODO2 : Change name const TVector<YandexQuery::Binding> Bindings; const NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; const THashMap<TString, TString> AccountIdSignatures; - const YandexQuery::QueryContent::QueryType QueryType; + const YandexQuery::QueryContent::QueryType QueryType; const YandexQuery::ExecuteMode ExecuteMode; const TString ResultId; const YandexQuery::StateLoadMode StateLoadMode; diff --git a/ydb/core/yq/libs/checkpoint_storage/checkpoint_storage.h b/ydb/core/yq/libs/checkpoint_storage/checkpoint_storage.h index 74923fc816..db4dab4fab 100644 --- a/ydb/core/yq/libs/checkpoint_storage/checkpoint_storage.h +++ b/ydb/core/yq/libs/checkpoint_storage/checkpoint_storage.h @@ -17,8 +17,8 @@ class ICheckpointStorage : public virtual TThrRefBase { public: using TGetCheckpointsResult = std::pair<TCheckpoints, NYql::TIssues>; using TGetCoordinatorsResult = std::pair<TCoordinators, NYql::TIssues>; - using TAddToStateSizeResult = std::pair<ui64, NYql::TIssues>; - using TGetTotalCheckpointsStateSizeResult = std::pair<ui64, NYql::TIssues>; + using TAddToStateSizeResult = std::pair<ui64, NYql::TIssues>; + using TGetTotalCheckpointsStateSizeResult = std::pair<ui64, NYql::TIssues>; using TCreateCheckpointResult = std::pair<TString, NYql::TIssues>; // graphDescId for subsequent usage. virtual NThreading::TFuture<NYql::TIssues> Init() = 0; @@ -50,7 +50,7 @@ public: const TCoordinatorId& coordinator, const TCheckpointId& checkpointId) = 0; - virtual NThreading::TFuture<TGetCheckpointsResult> GetCheckpoints(const TString& graph) = 0; + virtual NThreading::TFuture<TGetCheckpointsResult> GetCheckpoints(const TString& graph) = 0; virtual NThreading::TFuture<TGetCheckpointsResult> GetCheckpoints( const TString& graph, const TVector<ECheckpointStatus>& statuses, ui64 limit, bool loadGraphDescription = false) = 0; @@ -71,12 +71,12 @@ public: const TString& graphId, const TCheckpointId& checkpointUpperBound) = 0; - virtual NThreading::TFuture<ICheckpointStorage::TAddToStateSizeResult> AddToStateSize( - const TString& graphId, - const TCheckpointId& checkpoint, - ui64 size) = 0; - - virtual NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> GetTotalCheckpointsStateSize(const TString& graphId) = 0; + virtual NThreading::TFuture<ICheckpointStorage::TAddToStateSizeResult> AddToStateSize( + const TString& graphId, + const TCheckpointId& checkpoint, + ui64 size) = 0; + + virtual NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> GetTotalCheckpointsStateSize(const TString& graphId) = 0; }; using TCheckpointStoragePtr = TIntrusivePtr<ICheckpointStorage>; diff --git a/ydb/core/yq/libs/checkpoint_storage/events/events.h b/ydb/core/yq/libs/checkpoint_storage/events/events.h index 2641f85bae..2fd8659711 100644 --- a/ydb/core/yq/libs/checkpoint_storage/events/events.h +++ b/ydb/core/yq/libs/checkpoint_storage/events/events.h @@ -28,7 +28,7 @@ struct TEvCheckpointStorage { EvGetCheckpointsMetadataResponse, // Internal Storage events. - EvNewCheckpointSucceeded, + EvNewCheckpointSucceeded, EvEnd, }; @@ -161,15 +161,15 @@ struct TEvCheckpointStorage { struct TEvGetCheckpointsMetadataRequest : NActors::TEventLocal<TEvGetCheckpointsMetadataRequest, EvGetCheckpointsMetadataRequest> { explicit TEvGetCheckpointsMetadataRequest(TString graphId, TVector<ECheckpointStatus> statuses = TVector<ECheckpointStatus>(), ui64 limit = std::numeric_limits<ui64>::max(), bool loadGraphDescription = false) - : GraphId(std::move(graphId)) - , Statuses(std::move(statuses)) + : GraphId(std::move(graphId)) + , Statuses(std::move(statuses)) , Limit(limit) , LoadGraphDescription(loadGraphDescription) { } TString GraphId; - TVector<ECheckpointStatus> Statuses; - ui64 Limit; + TVector<ECheckpointStatus> Statuses; + ui64 Limit; bool LoadGraphDescription = false; }; @@ -185,8 +185,8 @@ struct TEvCheckpointStorage { }; // note that no response exists - struct TEvNewCheckpointSucceeded : NActors::TEventLocal<TEvNewCheckpointSucceeded, EvNewCheckpointSucceeded> { - TEvNewCheckpointSucceeded(TCoordinatorId coordinatorId, TCheckpointId checkpointId) + struct TEvNewCheckpointSucceeded : NActors::TEventLocal<TEvNewCheckpointSucceeded, EvNewCheckpointSucceeded> { + TEvNewCheckpointSucceeded(TCoordinatorId coordinatorId, TCheckpointId checkpointId) : CoordinatorId(std::move(coordinatorId)) , CheckpointId(std::move(checkpointId)) { diff --git a/ydb/core/yq/libs/checkpoint_storage/gc.cpp b/ydb/core/yq/libs/checkpoint_storage/gc.cpp index 2a0e73ecb4..6c98a83381 100644 --- a/ydb/core/yq/libs/checkpoint_storage/gc.cpp +++ b/ydb/core/yq/libs/checkpoint_storage/gc.cpp @@ -64,10 +64,10 @@ public: private: STRICT_STFUNC(StateFunc, - HFunc(TEvCheckpointStorage::TEvNewCheckpointSucceeded, Handle); + HFunc(TEvCheckpointStorage::TEvNewCheckpointSucceeded, Handle); ) - void Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx); + void Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx); }; TActorGC::TActorGC(const TCheckpointStoragePtr& checkpointStorage, const TStateStoragePtr& stateStorage) @@ -83,7 +83,7 @@ void TActorGC::Bootstrap(const TActorContext&) LOG_STREAMS_STORAGE_SERVICE_INFO("Successfully bootstrapped storage GC " << SelfId()); } -void TActorGC::Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx) +void TActorGC::Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx) { const auto* event = ev->Get(); const auto& graphId = event->CoordinatorId.GraphId; diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp index 07c8d5101a..d21291b2e5 100644 --- a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp +++ b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp @@ -8,12 +8,12 @@ #include <ydb/core/yq/libs/checkpointing_common/defs.h> #include <ydb/core/yq/libs/checkpoint_storage/events/events.h> - + #include <ydb/core/yq/libs/actors/logging/log.h> #include <ydb/core/yq/libs/ydb/util.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> - + #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> @@ -24,7 +24,7 @@ #define LOG_STORAGE_ASYNC_DEBUG(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_DEBUG, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream); #define LOG_STORAGE_ASYNC_INFO(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_INFO, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream); #define LOG_STORAGE_ASYNC_WARN(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_WARN, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream); - + namespace NYq { using namespace NActors; @@ -61,8 +61,8 @@ private: hFunc(TEvCheckpointStorage::TEvAbortCheckpointRequest, Handle); hFunc(TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest, Handle); - hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskState, Handle); - hFunc(NYql::NDq::TEvDqCompute::TEvGetTaskState, Handle); + hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskState, Handle); + hFunc(NYql::NDq::TEvDqCompute::TEvGetTaskState, Handle); ) void Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest::TPtr& ev); @@ -74,8 +74,8 @@ private: void Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest::TPtr& ev); - void Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev); - void Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev); + void Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev); + void Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev); }; static void FillDefaultParameters(NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig, NConfig::TYdbStorageConfig& ydbStorageConfig) { @@ -135,255 +135,255 @@ void TStorageProxy::Bootstrap() { void TStorageProxy::Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest::TPtr& ev) { const auto* event = ev->Get(); - LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] Got TEvRegisterCoordinatorRequest") - - CheckpointStorage->RegisterGraphCoordinator(event->CoordinatorId) - .Apply([coordinatorId = event->CoordinatorId, - cookie = ev->Cookie, - sender = ev->Sender, - context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) { - auto response = std::make_unique<TEvCheckpointStorage::TEvRegisterCoordinatorResponse>(); - response->Issues = issuesFuture.GetValue(); - if (response->Issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString()) - } else { - LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] Graph registered") - } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse") - context.Send(sender, response.release(), 0, cookie); - }); + LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] Got TEvRegisterCoordinatorRequest") + + CheckpointStorage->RegisterGraphCoordinator(event->CoordinatorId) + .Apply([coordinatorId = event->CoordinatorId, + cookie = ev->Cookie, + sender = ev->Sender, + context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) { + auto response = std::make_unique<TEvCheckpointStorage::TEvRegisterCoordinatorResponse>(); + response->Issues = issuesFuture.GetValue(); + if (response->Issues) { + LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString()) + } else { + LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] Graph registered") + } + LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse") + context.Send(sender, response.release(), 0, cookie); + }); } void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPtr& ev) { const auto* event = ev->Get(); - LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvCreateCheckpointRequest") + LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvCreateCheckpointRequest") - CheckpointStorage->GetTotalCheckpointsStateSize(event->CoordinatorId.GraphId) - .Apply([checkpointId = event->CheckpointId, - coordinatorId = event->CoordinatorId, - cookie = ev->Cookie, - sender = ev->Sender, + CheckpointStorage->GetTotalCheckpointsStateSize(event->CoordinatorId.GraphId) + .Apply([checkpointId = event->CheckpointId, + coordinatorId = event->CoordinatorId, + cookie = ev->Cookie, + sender = ev->Sender, totalGraphCheckpointsSizeLimit = Config.GetStateStorageLimits().GetMaxGraphCheckpointsSizeBytes(), - context = TActivationContext::AsActorContext()] - (const NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult>& resultFuture) { - auto result = resultFuture.GetValue(); - auto issues = result.second; - - if (issues) { + context = TActivationContext::AsActorContext()] + (const NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult>& resultFuture) { + auto result = resultFuture.GetValue(); + auto issues = result.second; + + if (issues) { LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString()); context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie); - return false; - } - - auto totalGraphCheckpointsSize = result.first; - - if (totalGraphCheckpointsSize > totalGraphCheckpointsSizeLimit) { - TStringStream ss; - ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize; - auto message = ss.Str(); - LOG_STORAGE_ASYNC_WARN(context, message) - issues.AddIssue(message); + return false; + } + + auto totalGraphCheckpointsSize = result.first; + + if (totalGraphCheckpointsSize > totalGraphCheckpointsSizeLimit) { + TStringStream ss; + ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize; + auto message = ss.Str(); + LOG_STORAGE_ASYNC_WARN(context, message) + issues.AddIssue(message); LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie); - return false; - } - return true; - }) + return false; + } + return true; + }) .Apply([checkpointId = event->CheckpointId, coordinatorId = event->CoordinatorId, cookie = ev->Cookie, sender = ev->Sender, graphDesc = event->GraphDescription, storage = CheckpointStorage] - (const NThreading::TFuture<bool>& passedSizeLimitCheckFuture) { - if (!passedSizeLimitCheckFuture.GetValue()) { + (const NThreading::TFuture<bool>& passedSizeLimitCheckFuture) { + if (!passedSizeLimitCheckFuture.GetValue()) { return NThreading::TFuture<ICheckpointStorage::TCreateCheckpointResult>(); - } + } if (std::holds_alternative<TString>(graphDesc)) { return storage->CreateCheckpoint(coordinatorId, checkpointId, std::get<TString>(graphDesc), ECheckpointStatus::Pending); } else { return storage->CreateCheckpoint(coordinatorId, checkpointId, std::get<NProto::TCheckpointGraphDescription>(graphDesc), ECheckpointStatus::Pending); } - }) - .Apply([checkpointId = event->CheckpointId, - coordinatorId = event->CoordinatorId, - cookie = ev->Cookie, - sender = ev->Sender, - context = TActivationContext::AsActorContext()] + }) + .Apply([checkpointId = event->CheckpointId, + coordinatorId = event->CoordinatorId, + cookie = ev->Cookie, + sender = ev->Sender, + context = TActivationContext::AsActorContext()] (const NThreading::TFuture<ICheckpointStorage::TCreateCheckpointResult>& resultFuture) { if (!resultFuture.Initialized()) { // didn't pass the size limit check - return; - } + return; + } auto result = resultFuture.GetValue(); auto issues = result.second; auto response = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointResponse>(checkpointId, std::move(issues), result.first); - if (response->Issues) { + if (response->Issues) { LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString()); - } else { + } else { LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created"); - } + } LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); - context.Send(sender, response.release(), 0, cookie); - }); + context.Send(sender, response.release(), 0, cookie); + }); } void TStorageProxy::Handle(TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest::TPtr& ev) { const auto* event = ev->Get(); - LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvSetCheckpointPendingCommitStatusRequest") - CheckpointStorage->UpdateCheckpointStatus(event->CoordinatorId, event->CheckpointId, ECheckpointStatus::PendingCommit, ECheckpointStatus::Pending) - .Apply([checkpointId = event->CheckpointId, - coordinatorId = event->CoordinatorId, - cookie = ev->Cookie, - sender = ev->Sender, - context = TActivationContext::AsActorContext()] - (const NThreading::TFuture<NYql::TIssues>& issuesFuture) { - auto issues = issuesFuture.GetValue(); - auto response = std::make_unique<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse>(checkpointId, std::move(issues)); - if (response->Issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString()) - } else { - LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'") - } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse") - context.Send(sender, response.release(), 0, cookie); - }); + LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvSetCheckpointPendingCommitStatusRequest") + CheckpointStorage->UpdateCheckpointStatus(event->CoordinatorId, event->CheckpointId, ECheckpointStatus::PendingCommit, ECheckpointStatus::Pending) + .Apply([checkpointId = event->CheckpointId, + coordinatorId = event->CoordinatorId, + cookie = ev->Cookie, + sender = ev->Sender, + context = TActivationContext::AsActorContext()] + (const NThreading::TFuture<NYql::TIssues>& issuesFuture) { + auto issues = issuesFuture.GetValue(); + auto response = std::make_unique<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse>(checkpointId, std::move(issues)); + if (response->Issues) { + LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString()) + } else { + LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'") + } + LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse") + context.Send(sender, response.release(), 0, cookie); + }); } void TStorageProxy::Handle(TEvCheckpointStorage::TEvCompleteCheckpointRequest::TPtr& ev) { const auto* event = ev->Get(); - LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvCompleteCheckpointRequest") - CheckpointStorage->UpdateCheckpointStatus(event->CoordinatorId, event->CheckpointId, ECheckpointStatus::Completed, ECheckpointStatus::PendingCommit) - .Apply([checkpointId = event->CheckpointId, - coordinatorId = event->CoordinatorId, - cookie = ev->Cookie, - sender = ev->Sender, + LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvCompleteCheckpointRequest") + CheckpointStorage->UpdateCheckpointStatus(event->CoordinatorId, event->CheckpointId, ECheckpointStatus::Completed, ECheckpointStatus::PendingCommit) + .Apply([checkpointId = event->CheckpointId, + coordinatorId = event->CoordinatorId, + cookie = ev->Cookie, + sender = ev->Sender, gcEnabled = Config.GetCheckpointGarbageConfig().GetEnabled(), - actorGC = ActorGC, - context = TActivationContext::AsActorContext()] - (const NThreading::TFuture<NYql::TIssues>& issuesFuture) { - auto issues = issuesFuture.GetValue(); - auto response = std::make_unique<TEvCheckpointStorage::TEvCompleteCheckpointResponse>(checkpointId, std::move(issues)); - if (response->Issues) { - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString()) - } else { - LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'") - if (gcEnabled) { - auto request = std::make_unique<TEvCheckpointStorage::TEvNewCheckpointSucceeded>(coordinatorId, checkpointId); - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded") - context.Send(actorGC, request.release(), 0); - } - } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse") - context.Send(sender, response.release(), 0, cookie); - }); + actorGC = ActorGC, + context = TActivationContext::AsActorContext()] + (const NThreading::TFuture<NYql::TIssues>& issuesFuture) { + auto issues = issuesFuture.GetValue(); + auto response = std::make_unique<TEvCheckpointStorage::TEvCompleteCheckpointResponse>(checkpointId, std::move(issues)); + if (response->Issues) { + LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString()) + } else { + LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'") + if (gcEnabled) { + auto request = std::make_unique<TEvCheckpointStorage::TEvNewCheckpointSucceeded>(coordinatorId, checkpointId); + LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded") + context.Send(actorGC, request.release(), 0); + } + } + LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse") + context.Send(sender, response.release(), 0, cookie); + }); } void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr& ev) { const auto* event = ev->Get(); - LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvAbortCheckpointRequest") - CheckpointStorage->AbortCheckpoint(event->CoordinatorId,event->CheckpointId) - .Apply([checkpointId = event->CheckpointId, - coordinatorId = event->CoordinatorId, - cookie = ev->Cookie, - sender = ev->Sender, - context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) { - auto issues = issuesFuture.GetValue(); - auto response = std::make_unique<TEvCheckpointStorage::TEvAbortCheckpointResponse>(checkpointId, std::move(issues)); - if (response->Issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString()) - } else { - LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted") - } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse") - context.Send(sender, response.release(), 0, cookie); - }); + LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvAbortCheckpointRequest") + CheckpointStorage->AbortCheckpoint(event->CoordinatorId,event->CheckpointId) + .Apply([checkpointId = event->CheckpointId, + coordinatorId = event->CoordinatorId, + cookie = ev->Cookie, + sender = ev->Sender, + context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) { + auto issues = issuesFuture.GetValue(); + auto response = std::make_unique<TEvCheckpointStorage::TEvAbortCheckpointResponse>(checkpointId, std::move(issues)); + if (response->Issues) { + LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString()) + } else { + LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted") + } + LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse") + context.Send(sender, response.release(), 0, cookie); + }); } void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest::TPtr& ev) { const auto* event = ev->Get(); - LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->GraphId << "] Got TEvGetCheckpointsMetadataRequest") + LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->GraphId << "] Got TEvGetCheckpointsMetadataRequest") CheckpointStorage->GetCheckpoints(event->GraphId, event->Statuses, event->Limit, event->LoadGraphDescription) - .Apply([graphId = event->GraphId, - cookie = ev->Cookie, - sender = ev->Sender, - context = TActivationContext::AsActorContext()] (const NThreading::TFuture<ICheckpointStorage::TGetCheckpointsResult>& futureResult) { - auto result = futureResult.GetValue(); - auto response = std::make_unique<TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse>(result.first, result.second); - if (response->Issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString()) - } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse") - context.Send(sender, response.release(), 0, cookie); - }); + .Apply([graphId = event->GraphId, + cookie = ev->Cookie, + sender = ev->Sender, + context = TActivationContext::AsActorContext()] (const NThreading::TFuture<ICheckpointStorage::TGetCheckpointsResult>& futureResult) { + auto result = futureResult.GetValue(); + auto response = std::make_unique<TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse>(result.first, result.second); + if (response->Issues) { + LOG_STORAGE_ASYNC_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString()) + } + LOG_STORAGE_ASYNC_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse") + context.Send(sender, response.release(), 0, cookie); + }); } -void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev) { +void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev) { auto* event = ev->Get(); - const auto checkpointId = TCheckpointId(event->Checkpoint.GetGeneration(), event->Checkpoint.GetId()); + const auto checkpointId = TCheckpointId(event->Checkpoint.GetGeneration(), event->Checkpoint.GetId()); LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << checkpointId << "] Got TEvSaveTaskState: task " << event->TaskId); - + const size_t stateSize = event->State.ByteSizeLong(); if (stateSize > Config.GetStateStorageLimits().GetMaxTaskStateSizeBytes()) { LOG_STREAMS_STORAGE_SERVICE_WARN("[" << checkpointId << "] Won't save task state because it's too big: task: " << event->TaskId << ", state size: " << stateSize << "/" << Config.GetStateStorageLimits().GetMaxTaskStateSizeBytes()); - auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>(); - response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration); - response->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo); - response->Record.SetStateSizeBytes(0); + auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>(); + response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration); + response->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo); + response->Record.SetStateSizeBytes(0); response->Record.SetTaskId(event->TaskId); - response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STATE_TOO_BIG); - Send(ev->Sender, response.release()); - return; - } - - StateStorage->SaveState(event->TaskId, event->GraphId, checkpointId, event->State) - .Apply([graphId = event->GraphId, - checkpointId, - taskId = event->TaskId, - cookie = ev->Cookie, - sender = ev->Sender, + response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STATE_TOO_BIG); + Send(ev->Sender, response.release()); + return; + } + + StateStorage->SaveState(event->TaskId, event->GraphId, checkpointId, event->State) + .Apply([graphId = event->GraphId, + checkpointId, + taskId = event->TaskId, + cookie = ev->Cookie, + sender = ev->Sender, stateSize = stateSize, - context = TActivationContext::AsActorContext()](const NThreading::TFuture<NYql::TIssues>& futureResult) { - const auto& issues = futureResult.GetValue(); - auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>(); - response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration); - response->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo); - response->Record.SetStateSizeBytes(stateSize); + context = TActivationContext::AsActorContext()](const NThreading::TFuture<NYql::TIssues>& futureResult) { + const auto& issues = futureResult.GetValue(); + auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>(); + response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration); + response->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo); + response->Record.SetStateSizeBytes(stateSize); response->Record.SetTaskId(taskId); - - if (issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString()) - response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR); - } else { - response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK); - } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult") - context.Send(sender, response.release(), 0, cookie); - }); + + if (issues) { + LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString()) + response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR); + } else { + response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK); + } + LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult") + context.Send(sender, response.release(), 0, cookie); + }); } -void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) { +void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) { const auto* event = ev->Get(); - const auto checkpointId = TCheckpointId(event->Checkpoint.GetGeneration(), event->Checkpoint.GetId()); + const auto checkpointId = TCheckpointId(event->Checkpoint.GetGeneration(), event->Checkpoint.GetId()); LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << checkpointId << "] Got TEvGetTaskState: tasks {" << JoinSeq(", ", event->TaskIds) << "}"); - + StateStorage->GetState(event->TaskIds, event->GraphId, checkpointId) - .Apply([checkpointId = event->Checkpoint, - generation = event->Generation, + .Apply([checkpointId = event->Checkpoint, + generation = event->Generation, taskIds = event->TaskIds, - cookie = ev->Cookie, - sender = ev->Sender, - context = TActivationContext::AsActorContext()](const NThreading::TFuture<IStateStorage::TGetStateResult>& resultFuture) { - auto result = resultFuture.GetValue(); + cookie = ev->Cookie, + sender = ev->Sender, + context = TActivationContext::AsActorContext()](const NThreading::TFuture<IStateStorage::TGetStateResult>& resultFuture) { + auto result = resultFuture.GetValue(); auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvGetTaskStateResult>(checkpointId, result.second, generation); std::swap(response->States, result.first); - if (response->Issues) { + if (response->Issues) { LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString()); - } + } LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult"); - context.Send(sender, response.release(), 0, cookie); - }); + context.Send(sender, response.release(), 0, cookie); + }); } } // namespace diff --git a/ydb/core/yq/libs/checkpoint_storage/ya.make b/ydb/core/yq/libs/checkpoint_storage/ya.make index fc2f449e37..b11c319eaa 100644 --- a/ydb/core/yq/libs/checkpoint_storage/ya.make +++ b/ydb/core/yq/libs/checkpoint_storage/ya.make @@ -26,8 +26,8 @@ PEERDIR( ydb/library/yql/dq/proto ) -YQL_LAST_ABI_VERSION() - +YQL_LAST_ABI_VERSION() + END() RECURSE( diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp index 7799b52daa..034254bb4b 100644 --- a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp +++ b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp @@ -7,7 +7,7 @@ #include <util/stream/str.h> #include <util/string/builder.h> -#include <util/string/printf.h> +#include <util/string/printf.h> #include <fmt/format.h> @@ -87,18 +87,18 @@ using TGetCoordinatorsContextPtr = TIntrusivePtr<TGetCoordinatorsContext>; //////////////////////////////////////////////////////////////////////////////// -struct TAddToStateSizeContext : public TThrRefBase { - ui64 Size = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -struct TGetTotalCheckpointsStateSizeContext : public TThrRefBase { - ui64 Size = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - +struct TAddToStateSizeContext : public TThrRefBase { + ui64 Size = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TGetTotalCheckpointsStateSizeContext : public TThrRefBase { + ui64 Size = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + TFuture<TDataQueryResult> SelectGraphCoordinators(const TGenerationContextPtr& context) { // TODO: use prepared queries @@ -239,14 +239,14 @@ TFuture<TStatus> UpdateCheckpoint(const TCheckpointContextPtr& context) { PRAGMA TablePathPrefix("%s"); $ts = cast(%lu as Timestamp); - UPSERT INTO %s (graph_id, coordinator_generation, seq_no, status, modified_by) VALUES + UPSERT INTO %s (graph_id, coordinator_generation, seq_no, status, modified_by) VALUES ("%s", %lu, %lu, %u, $ts); )", generationContext->TablePathPrefix.c_str(), TInstant::Now().MicroSeconds(), CheckpointsMetadataTable, generationContext->PrimaryKey.c_str(), - context->CheckpointId.CoordinatorGeneration, - context->CheckpointId.SeqNo, + context->CheckpointId.CoordinatorGeneration, + context->CheckpointId.SeqNo, (ui32)context->Status); auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx(); @@ -325,22 +325,22 @@ TFuture<TStatus> CreateCheckpointWrapper( TFuture<TDataQueryResult> SelectGraphCheckpoints(const TGenerationContextPtr& context, const TVector<ECheckpointStatus>& statuses, ui64 limit, TExecDataQuerySettings settings, bool loadGraphDescription) { - NYdb::TParamsBuilder paramsBuilder; - if (statuses) { - auto& statusesParam = paramsBuilder.AddParam("$statuses").BeginList(); - for (const auto& status : statuses) { - statusesParam.AddListItem().Uint8(static_cast<ui8>(status)); - } - statusesParam.EndList().Build(); - } - - paramsBuilder.AddParam("$graph_id").String(context->PrimaryKey).Build(); + NYdb::TParamsBuilder paramsBuilder; + if (statuses) { + auto& statusesParam = paramsBuilder.AddParam("$statuses").BeginList(); + for (const auto& status : statuses) { + statusesParam.AddListItem().Uint8(static_cast<ui8>(status)); + } + statusesParam.EndList().Build(); + } + + paramsBuilder.AddParam("$graph_id").String(context->PrimaryKey).Build(); if (limit < std::numeric_limits<ui64>::max()) { paramsBuilder.AddParam("$limit").Uint64(limit).Build(); } - - auto params = paramsBuilder.Build(); - + + auto params = paramsBuilder.Build(); + using namespace fmt::literals; TString join; if (loadGraphDescription) { @@ -355,12 +355,12 @@ TFuture<TDataQueryResult> SelectGraphCheckpoints(const TGenerationContextPtr& co const TString query = fmt::format(R"sql( --!syntax_v1 PRAGMA TablePathPrefix("{table_path_prefix}"); - PRAGMA AnsiInForEmptyOrNullableItemsCollections; + PRAGMA AnsiInForEmptyOrNullableItemsCollections; DECLARE $graph_id AS String; {optional_statuses_declaration} {optional_limit_declaration} - + SELECT {graph_description_field} metadata.coordinator_generation AS coordinator_generation, @@ -372,7 +372,7 @@ TFuture<TDataQueryResult> SelectGraphCheckpoints(const TGenerationContextPtr& co {join} WHERE metadata.graph_id = $graph_id {statuses_condition} - ORDER BY coordinator_generation, seq_no DESC + ORDER BY coordinator_generation, seq_no DESC {limit_condition}; )sql", "table_path_prefix"_a = context->TablePathPrefix, @@ -387,9 +387,9 @@ TFuture<TDataQueryResult> SelectGraphCheckpoints(const TGenerationContextPtr& co return context->Session.ExecuteDataQuery( query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - params, - settings); + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params, + settings); } TFuture<TStatus> ProcessCheckpoints( @@ -406,15 +406,15 @@ TFuture<TStatus> ProcessCheckpoints( while (parser.TryNextRow()) { TCheckpointId checkpointId( - *parser.ColumnParser("coordinator_generation").GetOptionalUint64(), - *parser.ColumnParser("seq_no").GetOptionalUint64()); + *parser.ColumnParser("coordinator_generation").GetOptionalUint64(), + *parser.ColumnParser("seq_no").GetOptionalUint64()); getContext->Checkpoints.emplace_back( context->PrimaryKey, checkpointId, ECheckpointStatus(*parser.ColumnParser("status").GetOptionalUint8()), - *parser.ColumnParser("created_by").GetOptionalTimestamp(), - *parser.ColumnParser("modified_by").GetOptionalTimestamp()); + *parser.ColumnParser("created_by").GetOptionalTimestamp(), + *parser.ColumnParser("modified_by").GetOptionalTimestamp()); if (loadGraphDescription) { if (const TMaybe<TString> graphDescription = parser.ColumnParser("graph_description").GetOptionalString(); graphDescription && *graphDescription) { @@ -446,12 +446,12 @@ TFuture<TDataQueryResult> SelectCheckpoint(const TCheckpointContextPtr& context) SELECT status FROM %s - WHERE graph_id = "%s" AND coordinator_generation = %lu AND seq_no = %lu; + WHERE graph_id = "%s" AND coordinator_generation = %lu AND seq_no = %lu; )", generationContext->TablePathPrefix.c_str(), CheckpointsMetadataTable, - generationContext->PrimaryKey.c_str(), - context->CheckpointId.CoordinatorGeneration, - context->CheckpointId.SeqNo); + generationContext->PrimaryKey.c_str(), + context->CheckpointId.CoordinatorGeneration, + context->CheckpointId.SeqNo); return generationContext->Session.ExecuteDataQuery( query, @@ -549,7 +549,7 @@ TFuture<TStatus> UpdateCheckpointWithCheckWrapper( class TCheckpointStorage : public ICheckpointStorage { TYdbConnectionPtr YdbConnection; - const NConfig::TYdbStorageConfig Config; + const NConfig::TYdbStorageConfig Config; public: explicit TCheckpointStorage( @@ -585,14 +585,14 @@ public: TFuture<TIssues> AbortCheckpoint( const TCoordinatorId& coordinator, - const TCheckpointId& checkpointId) override; + const TCheckpointId& checkpointId) override; TFuture<TGetCheckpointsResult> GetCheckpoints( const TString& graph) override; - TFuture<TGetCheckpointsResult> GetCheckpoints( + TFuture<TGetCheckpointsResult> GetCheckpoints( const TString& graph, const TVector<ECheckpointStatus>& statuses, ui64 limit, bool loadGraphDescription) override; - + TFuture<TIssues> DeleteGraph( const TString& graphId) override; @@ -603,14 +603,14 @@ public: TFuture<TIssues> DeleteMarkedCheckpoints( const TString& graphId, const TCheckpointId& checkpointUpperBound) override; - - TFuture<ICheckpointStorage::TAddToStateSizeResult> AddToStateSize( - const TString& graphId, - const TCheckpointId& checkpoint, - ui64 size) override; - - TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> GetTotalCheckpointsStateSize(const TString& graphId) override; - TExecDataQuerySettings DefaultExecDataQuerySettings(); + + TFuture<ICheckpointStorage::TAddToStateSizeResult> AddToStateSize( + const TString& graphId, + const TCheckpointId& checkpoint, + ui64 size) override; + + TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> GetTotalCheckpointsStateSize(const TString& graphId) override; + TExecDataQuerySettings DefaultExecDataQuerySettings(); private: TFuture<TCreateCheckpointResult> CreateCheckpointImpl(const TCoordinatorId& coordinator, const TCheckpointContextPtr& context); @@ -626,7 +626,7 @@ TCheckpointStorage::TCheckpointStorage( const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const IEntityIdGenerator::TPtr& entityIdGenerator) : YdbConnection(NewYdbConnection(config, credentialsProviderFactory)) - , Config(config) + , Config(config) , EntityIdGenerator(entityIdGenerator) { } @@ -685,14 +685,14 @@ TFuture<TIssues> TCheckpointStorage::Init() // so we set it primary key column to have index auto checkpointDesc = TTableBuilder() .AddNullableColumn("graph_id", EPrimitiveType::String) - .AddNullableColumn("coordinator_generation", EPrimitiveType::Uint64) - .AddNullableColumn("seq_no", EPrimitiveType::Uint64) + .AddNullableColumn("coordinator_generation", EPrimitiveType::Uint64) + .AddNullableColumn("seq_no", EPrimitiveType::Uint64) .AddNullableColumn("status", EPrimitiveType::Uint8) - .AddNullableColumn("created_by", EPrimitiveType::Timestamp) - .AddNullableColumn("modified_by", EPrimitiveType::Timestamp) - .AddNullableColumn("state_size", EPrimitiveType::Uint64) + .AddNullableColumn("created_by", EPrimitiveType::Timestamp) + .AddNullableColumn("modified_by", EPrimitiveType::Timestamp) + .AddNullableColumn("state_size", EPrimitiveType::Uint64) .AddNullableColumn("graph_description_id", EPrimitiveType::String) - .SetPrimaryKeyColumns({"graph_id", "coordinator_generation", "seq_no"}) + .SetPrimaryKeyColumns({"graph_id", "coordinator_generation", "seq_no"}) .Build(); RUN_CREATE_TABLE(CheckpointsMetadataTable, checkpointDesc); @@ -870,10 +870,10 @@ TFuture<TIssues> TCheckpointStorage::AbortCheckpoint( return StatusToIssues(future); } -TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckpoints(const TString& graph) { +TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckpoints(const TString& graph) { return GetCheckpoints(graph, TVector<ECheckpointStatus>(), std::numeric_limits<ui64>::max(), true); -} - +} + TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckpoints( const TString& graph, const TVector<ECheckpointStatus>& statuses, ui64 limit, bool loadGraphDescription) { @@ -953,18 +953,18 @@ TFuture<TIssues> TCheckpointStorage::MarkCheckpointsGC( $ts = cast(%lu as Timestamp); UPDATE %s - SET status = %u, modified_by = $ts + SET status = %u, modified_by = $ts WHERE graph_id = "%s" AND - (coordinator_generation < %lu OR - (coordinator_generation = %lu AND seq_no < %lu)); + (coordinator_generation < %lu OR + (coordinator_generation = %lu AND seq_no < %lu)); )", prefix.c_str(), TInstant::Now().MicroSeconds(), CheckpointsMetadataTable, (ui32)ECheckpointStatus::GC, graphId.c_str(), - checkpointUpperBound.CoordinatorGeneration, - checkpointUpperBound.CoordinatorGeneration, - checkpointUpperBound.SeqNo); + checkpointUpperBound.CoordinatorGeneration, + checkpointUpperBound.CoordinatorGeneration, + checkpointUpperBound.SeqNo); auto future = session.ExecuteDataQuery( query, @@ -1056,127 +1056,127 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints( return StatusToIssues(future); } TFuture<ICheckpointStorage::TAddToStateSizeResult> NYq::TCheckpointStorage::AddToStateSize(const TString& graphId, const TCheckpointId& checkpointId, ui64 size) { - auto result = MakeIntrusive<TAddToStateSizeContext>(); - auto future = YdbConnection->Client.RetryOperation( - [prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, size, result, thisPtr = TIntrusivePtr(this)](TSession session) { - NYdb::TParamsBuilder paramsBuilder; - paramsBuilder.AddParam("$graph_id").String(graphId).Build(); - paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointId.CoordinatorGeneration).Build(); - paramsBuilder.AddParam("$seq_no").Uint64(checkpointId.SeqNo).Build(); - paramsBuilder.AddParam("$task_state_size").Uint64(size).Build(); - auto params = paramsBuilder.Build(); - - auto query = Sprintf(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("%s"); - - declare $graph_id as string; - declare $coordinator_generation as Uint64; - declare $seq_no as Uint64; - declare $task_state_size as Uint64; - - $current_size = SELECT state_size - FROM %s - WHERE graph_id = $graph_id - AND coordinator_generation = $coordinator_generation - AND seq_no = $seq_no; - - UPDATE %s - SET state_size = $current_size + $task_state_size - WHERE graph_id = $graph_id - AND coordinator_generation = $coordinator_generation - AND seq_no = $seq_no; - - SELECT $current_size + $task_state_size; - )", prefix.c_str(), CheckpointsMetadataTable, CheckpointsMetadataTable); - - return session.ExecuteDataQuery( - query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - params, - thisPtr->DefaultExecDataQuerySettings()) - .Apply( - [result, graphId, checkpointId](const TFuture<TDataQueryResult>& future) { - const auto& queryResult = future.GetValue(); - auto status = TStatus(queryResult); - - if (!queryResult.IsSuccess()) { - Cerr << "AddToStateSize: can't update state size [" << graphId << " " << checkpointId << "] " << queryResult.GetIssues().ToString() << Endl; - return status; - } - TResultSetParser parser = queryResult.GetResultSetParser(0); - if (parser.TryNextRow()) { - result->Size = parser.ColumnParser(0).GetOptionalUint64().GetOrElse(0); - } else { - Cerr << "ERROR: got no rows in AddToStateSize result set" << Endl; - } - return status; - }); - }); - - return future.Apply( - [result](const TFuture<TStatus>& status) { - return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues())); - }); -} - + auto result = MakeIntrusive<TAddToStateSizeContext>(); + auto future = YdbConnection->Client.RetryOperation( + [prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, size, result, thisPtr = TIntrusivePtr(this)](TSession session) { + NYdb::TParamsBuilder paramsBuilder; + paramsBuilder.AddParam("$graph_id").String(graphId).Build(); + paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointId.CoordinatorGeneration).Build(); + paramsBuilder.AddParam("$seq_no").Uint64(checkpointId.SeqNo).Build(); + paramsBuilder.AddParam("$task_state_size").Uint64(size).Build(); + auto params = paramsBuilder.Build(); + + auto query = Sprintf(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("%s"); + + declare $graph_id as string; + declare $coordinator_generation as Uint64; + declare $seq_no as Uint64; + declare $task_state_size as Uint64; + + $current_size = SELECT state_size + FROM %s + WHERE graph_id = $graph_id + AND coordinator_generation = $coordinator_generation + AND seq_no = $seq_no; + + UPDATE %s + SET state_size = $current_size + $task_state_size + WHERE graph_id = $graph_id + AND coordinator_generation = $coordinator_generation + AND seq_no = $seq_no; + + SELECT $current_size + $task_state_size; + )", prefix.c_str(), CheckpointsMetadataTable, CheckpointsMetadataTable); + + return session.ExecuteDataQuery( + query, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params, + thisPtr->DefaultExecDataQuerySettings()) + .Apply( + [result, graphId, checkpointId](const TFuture<TDataQueryResult>& future) { + const auto& queryResult = future.GetValue(); + auto status = TStatus(queryResult); + + if (!queryResult.IsSuccess()) { + Cerr << "AddToStateSize: can't update state size [" << graphId << " " << checkpointId << "] " << queryResult.GetIssues().ToString() << Endl; + return status; + } + TResultSetParser parser = queryResult.GetResultSetParser(0); + if (parser.TryNextRow()) { + result->Size = parser.ColumnParser(0).GetOptionalUint64().GetOrElse(0); + } else { + Cerr << "ERROR: got no rows in AddToStateSize result set" << Endl; + } + return status; + }); + }); + + return future.Apply( + [result](const TFuture<TStatus>& status) { + return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues())); + }); +} + TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> NYq::TCheckpointStorage::GetTotalCheckpointsStateSize(const TString& graphId) { - auto result = MakeIntrusive<TGetTotalCheckpointsStateSizeContext>(); - auto future = YdbConnection->Client.RetryOperation( - [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result](TSession session) { - NYdb::TParamsBuilder paramsBuilder; - paramsBuilder.AddParam("$graph_id").String(graphId).Build(); - auto params = paramsBuilder.Build(); - - auto query = Sprintf(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("%s"); - - declare $graph_id as string; - - SELECT SUM(state_size) - FROM %s - WHERE graph_id = $graph_id - )", prefix.c_str(), CheckpointsMetadataTable); - - return session.ExecuteDataQuery( - query, - TTxControl::BeginTx(TTxSettings::OnlineRO()).CommitTx(), - params, - thisPtr->DefaultExecDataQuerySettings()) - .Apply( - [graphId, result](const TFuture<TDataQueryResult>& future) { - const auto& queryResult = future.GetValue(); - auto status = TStatus(queryResult); - - if (!queryResult.IsSuccess()) { - Cerr << "AddToStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString() << Endl; - return status; - } - - TResultSetParser parser = queryResult.GetResultSetParser(0); - if (parser.TryNextRow()) { - result->Size = parser.ColumnParser(0).GetOptionalUint64().GetOrElse(0); - } else { - result->Size = 0; - } - return status; - }); - }); - return future.Apply( - [result](const TFuture<TStatus>& status) { - return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues())); - }); -} - + auto result = MakeIntrusive<TGetTotalCheckpointsStateSizeContext>(); + auto future = YdbConnection->Client.RetryOperation( + [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result](TSession session) { + NYdb::TParamsBuilder paramsBuilder; + paramsBuilder.AddParam("$graph_id").String(graphId).Build(); + auto params = paramsBuilder.Build(); + + auto query = Sprintf(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("%s"); + + declare $graph_id as string; + + SELECT SUM(state_size) + FROM %s + WHERE graph_id = $graph_id + )", prefix.c_str(), CheckpointsMetadataTable); + + return session.ExecuteDataQuery( + query, + TTxControl::BeginTx(TTxSettings::OnlineRO()).CommitTx(), + params, + thisPtr->DefaultExecDataQuerySettings()) + .Apply( + [graphId, result](const TFuture<TDataQueryResult>& future) { + const auto& queryResult = future.GetValue(); + auto status = TStatus(queryResult); + + if (!queryResult.IsSuccess()) { + Cerr << "AddToStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString() << Endl; + return status; + } + + TResultSetParser parser = queryResult.GetResultSetParser(0); + if (parser.TryNextRow()) { + result->Size = parser.ColumnParser(0).GetOptionalUint64().GetOrElse(0); + } else { + result->Size = 0; + } + return status; + }); + }); + return future.Apply( + [result](const TFuture<TStatus>& status) { + return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues())); + }); +} + TExecDataQuerySettings NYq::TCheckpointStorage::DefaultExecDataQuerySettings() { - return TExecDataQuerySettings() - .KeepInQueryCache(true) - .ClientTimeout(TDuration::Seconds(Config.GetClientTimeoutSec())) - .OperationTimeout(TDuration::Seconds(Config.GetOperationTimeoutSec())) - .CancelAfter(TDuration::Seconds(Config.GetCancelAfterSec())); -} - + return TExecDataQuerySettings() + .KeepInQueryCache(true) + .ClientTimeout(TDuration::Seconds(Config.GetClientTimeoutSec())) + .OperationTimeout(TDuration::Seconds(Config.GetOperationTimeoutSec())) + .CancelAfter(TDuration::Seconds(Config.GetCancelAfterSec())); +} + } // namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp index ab209ec1e8..6a57254e10 100644 --- a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp +++ b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp @@ -147,7 +147,7 @@ TFuture<TStatus> ProcessState( class TStateStorage : public IStateStorage { TYdbConnectionPtr YdbConnection; - const NConfig::TYdbStorageConfig Config; + const NConfig::TYdbStorageConfig Config; public: explicit TStateStorage( @@ -178,10 +178,10 @@ public: TFuture<TIssues> DeleteCheckpoints( const TString& graphId, const TCheckpointId& checkpointId) override; - - TFuture<TDataQueryResult> SelectState(const TContextPtr& context); - TFuture<TStatus> UpsertState(const TContextPtr& context); - TExecDataQuerySettings DefaultExecDataQuerySettings(); + + TFuture<TDataQueryResult> SelectState(const TContextPtr& context); + TFuture<TStatus> UpsertState(const TContextPtr& context); + TExecDataQuerySettings DefaultExecDataQuerySettings(); }; //////////////////////////////////////////////////////////////////////////////// @@ -190,7 +190,7 @@ TStateStorage::TStateStorage( const NConfig::TYdbStorageConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) : YdbConnection(NewYdbConnection(config, credentialsProviderFactory)) - , Config(config) + , Config(config) { } @@ -219,12 +219,12 @@ TFuture<TIssues> TStateStorage::Init() } auto stateDesc = TTableBuilder() - .AddNullableColumn("graph_id", EPrimitiveType::String) + .AddNullableColumn("graph_id", EPrimitiveType::String) .AddNullableColumn("task_id", EPrimitiveType::Uint64) - .AddNullableColumn("coordinator_generation", EPrimitiveType::Uint64) - .AddNullableColumn("seq_no", EPrimitiveType::Uint64) + .AddNullableColumn("coordinator_generation", EPrimitiveType::Uint64) + .AddNullableColumn("seq_no", EPrimitiveType::Uint64) .AddNullableColumn("blob", EPrimitiveType::String) - .SetPrimaryKeyColumns({"graph_id", "task_id", "coordinator_generation", "seq_no"}) + .SetPrimaryKeyColumns({"graph_id", "task_id", "coordinator_generation", "seq_no"}) .Build(); auto status = CreateTable(YdbConnection, StatesTable, std::move(stateDesc)).GetValueSync(); @@ -260,7 +260,7 @@ TFuture<TIssues> TStateStorage::SaveState( state, session); - return thisPtr->UpsertState(context); + return thisPtr->UpsertState(context); }); return StatusToIssues(future); @@ -292,7 +292,7 @@ TFuture<IStateStorage::TGetStateResult> TStateStorage::GetState( auto future = YdbConnection->Client.RetryOperation( [context, thisPtr = TIntrusivePtr(this)] (TSession session) { context->Session = session; - auto future = thisPtr->SelectState(context); + auto future = thisPtr->SelectState(context); return future.Apply( [context] (const TFuture<TDataQueryResult>& future) { return ProcessState(future.GetValue(), context); @@ -315,33 +315,33 @@ TFuture<IStateStorage::TCountStatesResult> TStateStorage::CountStates( auto context = MakeIntrusive<TCountStateContext>(); auto future = YdbConnection->Client.RetryOperation( - [prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, context, thisPtr = TIntrusivePtr(this)] (TSession session) { - - // publish nodes - NYdb::TParamsBuilder paramsBuilder; - paramsBuilder.AddParam("$graph_id").String(graphId).Build(); - paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointId.CoordinatorGeneration).Build(); - paramsBuilder.AddParam("$seq_no").Uint64(checkpointId.SeqNo).Build(); - - auto params = paramsBuilder.Build(); + [prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, context, thisPtr = TIntrusivePtr(this)] (TSession session) { + + // publish nodes + NYdb::TParamsBuilder paramsBuilder; + paramsBuilder.AddParam("$graph_id").String(graphId).Build(); + paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointId.CoordinatorGeneration).Build(); + paramsBuilder.AddParam("$seq_no").Uint64(checkpointId.SeqNo).Build(); + + auto params = paramsBuilder.Build(); auto query = Sprintf(R"( --!syntax_v1 PRAGMA TablePathPrefix("%s"); - declare $graph_id as string; - declare $coordinator_generation as Uint64; - declare $seq_no as Uint64; - + declare $graph_id as string; + declare $coordinator_generation as Uint64; + declare $seq_no as Uint64; + SELECT COUNT(*) as cnt FROM %s - WHERE graph_id = $graph_id AND coordinator_generation = $coordinator_generation and seq_no = $seq_no; - )", prefix.c_str(), StatesTable); + WHERE graph_id = $graph_id AND coordinator_generation = $coordinator_generation and seq_no = $seq_no; + )", prefix.c_str(), StatesTable); auto future = session.ExecuteDataQuery( query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - params, - thisPtr->DefaultExecDataQuerySettings()); + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params, + thisPtr->DefaultExecDataQuerySettings()); return future.Apply( [context] (const TFuture<TDataQueryResult>& future) { @@ -370,30 +370,30 @@ TFuture<IStateStorage::TCountStatesResult> TStateStorage::CountStates( return countResult; }); } -TExecDataQuerySettings TStateStorage::DefaultExecDataQuerySettings() { - return TExecDataQuerySettings() - .KeepInQueryCache(true) - .ClientTimeout(TDuration::Seconds(Config.GetClientTimeoutSec())) - .OperationTimeout(TDuration::Seconds(Config.GetOperationTimeoutSec())) - .CancelAfter(TDuration::Seconds(Config.GetCancelAfterSec())); -} +TExecDataQuerySettings TStateStorage::DefaultExecDataQuerySettings() { + return TExecDataQuerySettings() + .KeepInQueryCache(true) + .ClientTimeout(TDuration::Seconds(Config.GetClientTimeoutSec())) + .OperationTimeout(TDuration::Seconds(Config.GetOperationTimeoutSec())) + .CancelAfter(TDuration::Seconds(Config.GetCancelAfterSec())); +} TFuture<TIssues> TStateStorage::DeleteGraph(const TString& graphId) { auto future = YdbConnection->Client.RetryOperation( - [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this)] (TSession session) { - - // publish nodes - NYdb::TParamsBuilder paramsBuilder; - paramsBuilder.AddParam("$graph_id").String(graphId).Build(); - - auto params = paramsBuilder.Build(); - + [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this)] (TSession session) { + + // publish nodes + NYdb::TParamsBuilder paramsBuilder; + paramsBuilder.AddParam("$graph_id").String(graphId).Build(); + + auto params = paramsBuilder.Build(); + auto query = Sprintf(R"( --!syntax_v1 PRAGMA TablePathPrefix("%s"); - declare $graph_id as string; - + declare $graph_id as string; + DELETE FROM %s WHERE graph_id = "%s"; @@ -401,9 +401,9 @@ TFuture<TIssues> TStateStorage::DeleteGraph(const TString& graphId) { auto future = session.ExecuteDataQuery( query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - params, - thisPtr->DefaultExecDataQuerySettings()); + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params, + thisPtr->DefaultExecDataQuerySettings()); return future.Apply( [] (const TFuture<TDataQueryResult>& future) { @@ -420,36 +420,36 @@ TFuture<TIssues> TStateStorage::DeleteCheckpoints( const TCheckpointId& checkpointUpperBound) { auto future = YdbConnection->Client.RetryOperation( - [prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, thisPtr = TIntrusivePtr(this)] (TSession session) { - - // publish nodes - NYdb::TParamsBuilder paramsBuilder; - paramsBuilder.AddParam("$graph_id").String(graphId).Build(); - paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointUpperBound.CoordinatorGeneration).Build(); - paramsBuilder.AddParam("$seq_no").Uint64(checkpointUpperBound.SeqNo).Build(); - - auto params = paramsBuilder.Build(); - + [prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, thisPtr = TIntrusivePtr(this)] (TSession session) { + + // publish nodes + NYdb::TParamsBuilder paramsBuilder; + paramsBuilder.AddParam("$graph_id").String(graphId).Build(); + paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointUpperBound.CoordinatorGeneration).Build(); + paramsBuilder.AddParam("$seq_no").Uint64(checkpointUpperBound.SeqNo).Build(); + + auto params = paramsBuilder.Build(); + auto query = Sprintf(R"( --!syntax_v1 PRAGMA TablePathPrefix("%s"); - declare $graph_id as string; - declare $coordinator_generation as Uint64; - declare $seq_no as Uint64; - + declare $graph_id as string; + declare $coordinator_generation as Uint64; + declare $seq_no as Uint64; + DELETE FROM %s - WHERE graph_id = $graph_id AND - (coordinator_generation < $coordinator_generation OR - (coordinator_generation = $coordinator_generation AND seq_no < $seq_no)); - )", prefix.c_str(), StatesTable); + WHERE graph_id = $graph_id AND + (coordinator_generation < $coordinator_generation OR + (coordinator_generation = $coordinator_generation AND seq_no < $seq_no)); + )", prefix.c_str(), StatesTable); auto future = session.ExecuteDataQuery( query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - params, - thisPtr->DefaultExecDataQuerySettings()); + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params, + thisPtr->DefaultExecDataQuerySettings()); return future.Apply( [] (const TFuture<TDataQueryResult>& future) { @@ -460,9 +460,9 @@ TFuture<TIssues> TStateStorage::DeleteCheckpoints( return StatusToIssues(future); } -TFuture<TDataQueryResult> TStateStorage::SelectState(const TContextPtr& context) -{ - NYdb::TParamsBuilder paramsBuilder; +TFuture<TDataQueryResult> TStateStorage::SelectState(const TContextPtr& context) +{ + NYdb::TParamsBuilder paramsBuilder; Y_VERIFY(!context->TaskIds.empty()); if (context->TaskIds.size() == 1) { paramsBuilder.AddParam("$task_id").Uint64(context->TaskIds[0]).Build(); @@ -473,83 +473,83 @@ TFuture<TDataQueryResult> TStateStorage::SelectState(const TContextPtr& context) } taskIdsParam.EndList().Build(); } - paramsBuilder.AddParam("$graph_id").String(context->GraphId).Build(); - paramsBuilder.AddParam("$coordinator_generation").Uint64(context->CheckpointId.CoordinatorGeneration).Build(); - paramsBuilder.AddParam("$seq_no").Uint64(context->CheckpointId.SeqNo).Build(); - - auto params = paramsBuilder.Build(); - - auto query = Sprintf(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("%s"); - + paramsBuilder.AddParam("$graph_id").String(context->GraphId).Build(); + paramsBuilder.AddParam("$coordinator_generation").Uint64(context->CheckpointId.CoordinatorGeneration).Build(); + paramsBuilder.AddParam("$seq_no").Uint64(context->CheckpointId.SeqNo).Build(); + + auto params = paramsBuilder.Build(); + + auto query = Sprintf(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("%s"); + %s; DECLARE $graph_id AS string; DECLARE $coordinator_generation AS Uint64; DECLARE $seq_no AS Uint64; - + SELECT task_id, blob - FROM %s + FROM %s WHERE %s AND graph_id = $graph_id AND coordinator_generation = $coordinator_generation AND seq_no = $seq_no; )", context->TablePathPrefix.c_str(), context->TaskIds.size() == 1 ? "DECLARE $task_id AS Uint64" : "DECLARE $task_ids AS List<Uint64>", StatesTable, context->TaskIds.size() == 1 ? "task_id = $task_id" : "task_id IN $task_ids"); - + Y_VERIFY(context->Session); return context->Session->ExecuteDataQuery( - query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - params, - DefaultExecDataQuerySettings()); -} - -TFuture<TStatus> TStateStorage::UpsertState(const TContextPtr& context) { + query, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params, + DefaultExecDataQuerySettings()); +} + +TFuture<TStatus> TStateStorage::UpsertState(const TContextPtr& context) { Y_VERIFY(context->States.size() == 1); TString serializedState; if (!context->States[0].SerializeToString(&serializedState)) { return MakeFuture(MakeErrorStatus(EStatus::BAD_REQUEST, "Failed to serialize compute actor state", NYql::TSeverityIds::S_ERROR)); } - - // publish nodes - NYdb::TParamsBuilder paramsBuilder; + + // publish nodes + NYdb::TParamsBuilder paramsBuilder; Y_VERIFY(context->TaskIds.size() == 1); paramsBuilder.AddParam("$task_id").Uint64(context->TaskIds[0]).Build(); - paramsBuilder.AddParam("$graph_id").String(context->GraphId).Build(); - paramsBuilder.AddParam("$coordinator_generation").Uint64(context->CheckpointId.CoordinatorGeneration).Build(); - paramsBuilder.AddParam("$seq_no").Uint64(context->CheckpointId.SeqNo).Build(); + paramsBuilder.AddParam("$graph_id").String(context->GraphId).Build(); + paramsBuilder.AddParam("$coordinator_generation").Uint64(context->CheckpointId.CoordinatorGeneration).Build(); + paramsBuilder.AddParam("$seq_no").Uint64(context->CheckpointId.SeqNo).Build(); paramsBuilder.AddParam("$blob").String(serializedState).Build(); - - auto params = paramsBuilder.Build(); - - auto query = Sprintf(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("%s"); - - declare $task_id as Uint64; - declare $graph_id as string; - declare $coordinator_generation as Uint64; - declare $seq_no as Uint64; - declare $blob as string; - - UPSERT INTO %s (task_id, graph_id, coordinator_generation, seq_no, blob) VALUES - ($task_id, $graph_id, $coordinator_generation, $seq_no, $blob); - )", context->TablePathPrefix.c_str(), StatesTable); - + + auto params = paramsBuilder.Build(); + + auto query = Sprintf(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("%s"); + + declare $task_id as Uint64; + declare $graph_id as string; + declare $coordinator_generation as Uint64; + declare $seq_no as Uint64; + declare $blob as string; + + UPSERT INTO %s (task_id, graph_id, coordinator_generation, seq_no, blob) VALUES + ($task_id, $graph_id, $coordinator_generation, $seq_no, $blob); + )", context->TablePathPrefix.c_str(), StatesTable); + Y_VERIFY(context->Session); auto future = context->Session->ExecuteDataQuery( - query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - params, - DefaultExecDataQuerySettings()); - return future.Apply( - [] (const TFuture<TDataQueryResult>& future) { - TStatus status = future.GetValue(); - return status; - }); -} - + query, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params, + DefaultExecDataQuerySettings()); + return future.Apply( + [] (const TFuture<TDataQueryResult>& future) { + TStatus status = future.GetValue(); + return status; + }); +} + } // namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp index d9d354c2e5..1c2ab3ea06 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp @@ -1,20 +1,20 @@ #include "utils.h" -#include "checkpoint_coordinator.h" - +#include "checkpoint_coordinator.h" + #include <ydb/core/yq/libs/actors/logging/log.h> - -#include <library/cpp/actors/core/actor.h> -#include <library/cpp/actors/core/hfunc.h> - + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/hfunc.h> + #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h> #include <ydb/library/yql/dq/actors/dq.h> #include <ydb/library/yql/dq/state/dq_state_load_plan.h> - + #include <util/string/builder.h> -#include <utility> - +#include <utility> + #define CC_LOG_D(stream) \ LOG_STREAMS_CHECKPOINT_COORDINATOR_DEBUG("[" << CoordinatorId << "] " << stream) #define CC_LOG_I(stream) \ @@ -25,31 +25,31 @@ LOG_STREAMS_CHECKPOINT_COORDINATOR_ERROR("[" << CoordinatorId << "] " << stream) namespace NYq { - -TCheckpointCoordinator::TCheckpointCoordinator(TCoordinatorId coordinatorId, - const TActorId& taskControllerId, + +TCheckpointCoordinator::TCheckpointCoordinator(TCoordinatorId coordinatorId, + const TActorId& taskControllerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, - const NMonitoring::TDynamicCounterPtr& counters, + const NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode, const YandexQuery::StreamingDisposition& streamingDisposition) - : CoordinatorId(std::move(coordinatorId)) - , TaskControllerId(taskControllerId) - , StorageProxy(storageProxy) + : CoordinatorId(std::move(coordinatorId)) + , TaskControllerId(taskControllerId) + , StorageProxy(storageProxy) , RunActorId(runActorId) - , Settings(settings) - , CheckpointingPeriod(TDuration::MilliSeconds(Settings.GetCheckpointingPeriodMillis())) + , Settings(settings) + , CheckpointingPeriod(TDuration::MilliSeconds(Settings.GetCheckpointingPeriodMillis())) , GraphParams(graphParams) - , Metrics(TCheckpointCoordinatorMetrics(counters)) + , Metrics(TCheckpointCoordinatorMetrics(counters)) , StateLoadMode(stateLoadMode) , StreamingDisposition(streamingDisposition) { -} - -void TCheckpointCoordinator::Bootstrap() { - Become(&TThis::DispatchEvent); +} + +void TCheckpointCoordinator::Bootstrap() { + Become(&TThis::DispatchEvent); CC_LOG_D("Bootstrapped with streaming disposition " << StreamingDisposition << " and state load mode " << YandexQuery::StateLoadMode_Name(StateLoadMode)); } @@ -91,37 +91,37 @@ void TCheckpointCoordinator::Handle(const NYql::NDqs::TEvReadyState::TPtr& ev) { PendingInit = std::make_unique<TPendingInitCoordinator>(AllActors.size()); CC_LOG_D("Send TEvRegisterCoordinatorRequest"); - Send(StorageProxy, new TEvCheckpointStorage::TEvRegisterCoordinatorRequest(CoordinatorId), IEventHandle::FlagTrackDelivery); -} - -void TCheckpointCoordinator::ScheduleNextCheckpoint() { - Schedule(CheckpointingPeriod, new TEvCheckpointCoordinator::TEvScheduleCheckpointing()); -} - -void TCheckpointCoordinator::UpdateInProgressMetric() { - const auto pending = PendingCheckpoints.size(); - const auto pendingCommit = PendingCommitCheckpoints.size(); - Metrics.Pending->Set(pending); - Metrics.PendingCommit->Set(pendingCommit); - Metrics.InProgress->Set(pending + pendingCommit); -} - -void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr& ev) { + Send(StorageProxy, new TEvCheckpointStorage::TEvRegisterCoordinatorRequest(CoordinatorId), IEventHandle::FlagTrackDelivery); +} + +void TCheckpointCoordinator::ScheduleNextCheckpoint() { + Schedule(CheckpointingPeriod, new TEvCheckpointCoordinator::TEvScheduleCheckpointing()); +} + +void TCheckpointCoordinator::UpdateInProgressMetric() { + const auto pending = PendingCheckpoints.size(); + const auto pendingCommit = PendingCommitCheckpoints.size(); + Metrics.Pending->Set(pending); + Metrics.PendingCommit->Set(pendingCommit); + Metrics.InProgress->Set(pending + pendingCommit); +} + +void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr& ev) { CC_LOG_D("Got TEvRegisterCoordinatorResponse; issues: " << ev->Get()->Issues.ToOneLineString()); - const auto& issues = ev->Get()->Issues; - if (issues) { + const auto& issues = ev->Get()->Issues; + if (issues) { auto message = "Can't register in storage: " + issues.ToOneLineString(); CC_LOG_E(message); - ++*Metrics.StorageError; - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message)); - return; - } - + ++*Metrics.StorageError; + Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message)); + return; + } + CC_LOG_D("Successfully registered in storage"); CC_LOG_I("Send TEvNewCheckpointCoordinator to " << AllActors.size() << " actor(s)"); for (const auto& [actor, transport] : AllActors) { transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinator(CoordinatorId.Generation, CoordinatorId.GraphId)); - } + } const bool needCheckpointMetadata = StateLoadMode == YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT || StreamingDisposition.has_from_last_checkpoint(); if (needCheckpointMetadata) { @@ -142,9 +142,9 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoord ScheduleNextCheckpoint(); } else { Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(TStringBuilder() << "Unexpected state load mode (" << YandexQuery::StateLoadMode_Name(StateLoadMode) << ") and streaming disposition " << StreamingDisposition)); - } -} - + } +} + void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck::TPtr& ev) { if (!OnComputeActorEventReceived(ev)) { return; @@ -159,42 +159,42 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvNewCheckpo } } -void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr& ev) { - const auto event = ev->Get(); - const auto& checkpoints = event->Checkpoints; +void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr& ev) { + const auto event = ev->Get(); + const auto& checkpoints = event->Checkpoints; CC_LOG_D("Got TEvGetCheckpointsMetadataResponse"); - Y_VERIFY(!PendingRestoreCheckpoint); - - if (event->Issues) { - ++*Metrics.StorageError; + Y_VERIFY(!PendingRestoreCheckpoint); + + if (event->Issues) { + ++*Metrics.StorageError; auto message = "Can't get checkpoints to restore: " + event->Issues.ToOneLineString(); CC_LOG_E(message); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message)); - return; - } - - Y_VERIFY(checkpoints.size() < 2); - if (!checkpoints.empty()) { - const auto& checkpoint = checkpoints.at(0); - CheckpointIdGenerator = std::make_unique<TCheckpointIdGenerator>(CoordinatorId, checkpoint.CheckpointId); + Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message)); + return; + } + + Y_VERIFY(checkpoints.size() < 2); + if (!checkpoints.empty()) { + const auto& checkpoint = checkpoints.at(0); + CheckpointIdGenerator = std::make_unique<TCheckpointIdGenerator>(CoordinatorId, checkpoint.CheckpointId); const bool needRestoreOffsets = StateLoadMode == YandexQuery::StateLoadMode::EMPTY && StreamingDisposition.has_from_last_checkpoint(); if (needRestoreOffsets) { TryToRestoreOffsetsFromForeignCheckpoint(checkpoint); } else { RestoreFromOwnCheckpoint(checkpoint); - } - return; - } - + } + return; + } + // Not restored from existing checkpoint. Init zero checkpoint ++*Metrics.StartedFromEmptyCheckpoint; CheckpointIdGenerator = std::make_unique<TCheckpointIdGenerator>(CoordinatorId); CC_LOG_I("Found no checkpoints to restore from, creating a 'zero' checkpoint"); InitingZeroCheckpoint = true; - InitCheckpoint(); - ScheduleNextCheckpoint(); -} - + InitCheckpoint(); + ScheduleNextCheckpoint(); +} + void TCheckpointCoordinator::RestoreFromOwnCheckpoint(const TCheckpointMetadata& checkpoint) { CC_LOG_I("Will restore from checkpoint " << checkpoint.CheckpointId); PendingRestoreCheckpoint = TPendingRestoreCheckpoint(checkpoint.CheckpointId, checkpoint.Status == ECheckpointStatus::PendingCommit, ActorsToWaitForSet); @@ -256,77 +256,77 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe } } -void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult::TPtr& ev) { +void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult::TPtr& ev) { if (!OnComputeActorEventReceived(ev)) { return; } - const auto& record = ev->Get()->Record; - const auto& checkpointProto = record.GetCheckpoint(); - const TCheckpointId checkpoint(checkpointProto.GetGeneration(), checkpointProto.GetId()); - const auto& status = record.GetStatus(); - const TString& statusName = NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_Name(status); - CC_LOG_D("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult; taskId: "<< record.GetTaskId() - << ", checkpoint: " << checkpoint + const auto& record = ev->Get()->Record; + const auto& checkpointProto = record.GetCheckpoint(); + const TCheckpointId checkpoint(checkpointProto.GetGeneration(), checkpointProto.GetId()); + const auto& status = record.GetStatus(); + const TString& statusName = NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_Name(status); + CC_LOG_D("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult; taskId: "<< record.GetTaskId() + << ", checkpoint: " << checkpoint << ", status: " << statusName); - - if (!PendingRestoreCheckpoint) { + + if (!PendingRestoreCheckpoint) { CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint"); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint")); - return; - } - - if (PendingRestoreCheckpoint->CheckpointId != checkpoint) { + Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint")); + return; + } + + if (PendingRestoreCheckpoint->CheckpointId != checkpoint) { CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult event with unexpected checkpoint: " << checkpoint << ", expected: " << PendingRestoreCheckpoint->CheckpointId); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got unexpected checkpoint")); - return; - } - - if (status != NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_OK) { + Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got unexpected checkpoint")); + return; + } + + if (status != NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_OK) { CC_LOG_E("[" << checkpoint << "] Can't restore: " << statusName); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Aborted("Can't restore: " + statusName)); - return; - } - - PendingRestoreCheckpoint->Acknowledge(ev->Sender); + Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Aborted("Can't restore: " + statusName)); + return; + } + + PendingRestoreCheckpoint->Acknowledge(ev->Sender); CC_LOG_D("[" << checkpoint << "] Task state restored, need " << PendingRestoreCheckpoint->NotYetAcknowledgedCount() << " more acks"); - - if (PendingRestoreCheckpoint->GotAllAcknowledges()) { + + if (PendingRestoreCheckpoint->GotAllAcknowledges()) { if (PendingInit) { PendingInit = nullptr; } - if (PendingRestoreCheckpoint->CommitAfterRestore) { + if (PendingRestoreCheckpoint->CommitAfterRestore) { CC_LOG_I("[" << checkpoint << "] State restored, send TEvCommitState to " << ActorsToNotify.size() << " actor(s)"); PendingCommitCheckpoints.emplace(checkpoint, TPendingCheckpoint(ActorsToNotifySet)); - UpdateInProgressMetric(); + UpdateInProgressMetric(); for (const auto& [actor, transport] : ActorsToNotify) { transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvCommitState(checkpoint.SeqNo, checkpoint.CoordinatorGeneration, CoordinatorId.Generation)); - } - } - + } + } + if (RestoringFromForeignCheckpoint) { InitingZeroCheckpoint = true; InitCheckpoint(); } - ScheduleNextCheckpoint(); + ScheduleNextCheckpoint(); CC_LOG_I("[" << checkpoint << "] State restored, send TEvRun to " << AllActors.size() << " actors"); for (const auto& [actor, transport] : AllActors) { transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvRun()); - } - } -} - -void TCheckpointCoordinator::InitCheckpoint() { - Y_VERIFY(CheckpointIdGenerator); - const auto nextCheckpointId = CheckpointIdGenerator->NextId(); + } + } +} + +void TCheckpointCoordinator::InitCheckpoint() { + Y_VERIFY(CheckpointIdGenerator); + const auto nextCheckpointId = CheckpointIdGenerator->NextId(); CC_LOG_I("[" << nextCheckpointId << "] Registering new checkpoint in storage"); - + PendingCheckpoints.emplace(nextCheckpointId, TPendingCheckpoint(ActorsToWaitForSet)); - UpdateInProgressMetric(); - ++*Metrics.Total; - + UpdateInProgressMetric(); + ++*Metrics.Total; + std::unique_ptr<TEvCheckpointStorage::TEvCreateCheckpointRequest> req; if (GraphDescId) { req = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointRequest>(CoordinatorId, nextCheckpointId, ActorsToWaitForSet.size(), GraphDescId); @@ -337,35 +337,35 @@ void TCheckpointCoordinator::InitCheckpoint() { } Send(StorageProxy, req.release(), IEventHandle::FlagTrackDelivery); -} - -void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvScheduleCheckpointing::TPtr&) { +} + +void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvScheduleCheckpointing::TPtr&) { CC_LOG_D("Got TEvScheduleCheckpointing"); - ScheduleNextCheckpoint(); - const auto checkpointsInFly = PendingCheckpoints.size() + PendingCommitCheckpoints.size(); + ScheduleNextCheckpoint(); + const auto checkpointsInFly = PendingCheckpoints.size() + PendingCommitCheckpoints.size(); if (checkpointsInFly >= Settings.GetMaxInflight() || InitingZeroCheckpoint) { CC_LOG_W("Skip schedule checkpoint event since inflight checkpoint limit exceeded: current: " << checkpointsInFly << ", limit: " << Settings.GetMaxInflight()); - Metrics.SkippedDueToInFlightLimit->Inc(); - return; - } - Metrics.SkippedDueToInFlightLimit->Set(0); - InitCheckpoint(); -} - -void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpointResponse::TPtr& ev) { - const auto& checkpointId = ev->Get()->CheckpointId; - const auto& issues = ev->Get()->Issues; + Metrics.SkippedDueToInFlightLimit->Inc(); + return; + } + Metrics.SkippedDueToInFlightLimit->Set(0); + InitCheckpoint(); +} + +void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpointResponse::TPtr& ev) { + const auto& checkpointId = ev->Get()->CheckpointId; + const auto& issues = ev->Get()->Issues; CC_LOG_D("[" << checkpointId << "] Got TEvCreateCheckpointResponse"); - - if (issues) { + + if (issues) { CC_LOG_E("[" << checkpointId << "] Can't create checkpoint: " << issues.ToOneLineString()); - PendingCheckpoints.erase(checkpointId); - UpdateInProgressMetric(); - ++*Metrics.FailedToCreate; - ++*Metrics.StorageError; - return; - } - + PendingCheckpoints.erase(checkpointId); + UpdateInProgressMetric(); + ++*Metrics.FailedToCreate; + ++*Metrics.StorageError; + return; + } + if (GraphDescId) { Y_VERIFY(GraphDescId == ev->Get()->GraphDescId); } else { @@ -381,156 +381,156 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpo } } else { InjectCheckpoint(checkpointId); - } + } } - + void TCheckpointCoordinator::InjectCheckpoint(const TCheckpointId& checkpointId) { CC_LOG_I("[" << checkpointId << "] Checkpoint successfully created, going to inject barriers to " << ActorsToTrigger.size() << " actor(s)"); for (const auto& [toTrigger, transport] : ActorsToTrigger) { transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvInjectCheckpoint(checkpointId.SeqNo, checkpointId.CoordinatorGeneration)); } - if (!GraphIsRunning) { + if (!GraphIsRunning) { CC_LOG_I("[" << checkpointId << "] Send TEvRun to all actors"); for (const auto& [actor, transport] : AllActors) { transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvRun()); - } - GraphIsRunning = true; - } -} - -void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult::TPtr& ev) { - const auto& proto = ev->Get()->Record; - const auto& checkpointProto = proto.GetCheckpoint(); - const auto& status = proto.GetStatus(); - const TString& statusName = NYql::NDqProto::TEvSaveTaskStateResult_EStatus_Name(status); - + } + GraphIsRunning = true; + } +} + +void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult::TPtr& ev) { + const auto& proto = ev->Get()->Record; + const auto& checkpointProto = proto.GetCheckpoint(); + const auto& status = proto.GetStatus(); + const TString& statusName = NYql::NDqProto::TEvSaveTaskStateResult_EStatus_Name(status); + if (!OnComputeActorEventReceived(ev)) { return; } - TCheckpointId checkpointId(checkpointProto.GetGeneration(), checkpointProto.GetId()); - - CC_LOG_D("[" << checkpointId << "] Got TEvSaveTaskStateResult; task " << proto.GetTaskId() + TCheckpointId checkpointId(checkpointProto.GetGeneration(), checkpointProto.GetId()); + + CC_LOG_D("[" << checkpointId << "] Got TEvSaveTaskStateResult; task " << proto.GetTaskId() << ", status: " << statusName << ", size: " << proto.GetStateSizeBytes()); - - const auto it = PendingCheckpoints.find(checkpointId); - if (it == PendingCheckpoints.end()) { - return; - } - auto& checkpoint = it->second; - - if (status == NYql::NDqProto::TEvSaveTaskStateResult::OK) { - checkpoint.Acknowledge(ev->Sender, proto.GetStateSizeBytes()); + + const auto it = PendingCheckpoints.find(checkpointId); + if (it == PendingCheckpoints.end()) { + return; + } + auto& checkpoint = it->second; + + if (status == NYql::NDqProto::TEvSaveTaskStateResult::OK) { + checkpoint.Acknowledge(ev->Sender, proto.GetStateSizeBytes()); CC_LOG_D("[" << checkpointId << "] Task state saved, need " << checkpoint.NotYetAcknowledgedCount() << " more acks"); - if (checkpoint.GotAllAcknowledges()) { + if (checkpoint.GotAllAcknowledges()) { CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'"); - Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId), IEventHandle::FlagTrackDelivery); + Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId), IEventHandle::FlagTrackDelivery); if (InitingZeroCheckpoint) { Send(RunActorId, new TEvCheckpointCoordinator::TEvZeroCheckpointDone()); } - } - } else { + } + } else { CC_LOG_E("[" << checkpointId << "] Can't save node state, aborting checkpoint"); - Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery); - } -} - -void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse::TPtr& ev) { - const auto& checkpointId = ev->Get()->CheckpointId; - const auto issues = ev->Get()->Issues; + Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery); + } +} + +void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse::TPtr& ev) { + const auto& checkpointId = ev->Get()->CheckpointId; + const auto issues = ev->Get()->Issues; CC_LOG_D("[" << checkpointId << "] Got TEvSetCheckpointPendingCommitStatusResponse"); - const auto it = PendingCheckpoints.find(checkpointId); - if (it == PendingCheckpoints.end()) { + const auto it = PendingCheckpoints.find(checkpointId); + if (it == PendingCheckpoints.end()) { CC_LOG_W("[" << checkpointId << "] Got TEvSetCheckpointPendingCommitStatusResponse for checkpoint but it is not in PendingCheckpoints"); - return; - } - - if (issues) { + return; + } + + if (issues) { CC_LOG_E("[" << checkpointId << "] Can't change checkpoint status to 'PendingCommit': " << issues.ToString()); - ++*Metrics.StorageError; - PendingCheckpoints.erase(it); - return; - } - + ++*Metrics.StorageError; + PendingCheckpoints.erase(it); + return; + } + CC_LOG_I("[" << checkpointId << "] Checkpoint status changed to 'PendingCommit', committing states"); PendingCommitCheckpoints.emplace(checkpointId, TPendingCheckpoint(ActorsToNotifySet, it->second.GetStats())); - PendingCheckpoints.erase(it); - UpdateInProgressMetric(); + PendingCheckpoints.erase(it); + UpdateInProgressMetric(); for (const auto& [toTrigger, transport] : ActorsToNotify) { transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvCommitState(checkpointId.SeqNo, checkpointId.CoordinatorGeneration, CoordinatorId.Generation)); - } -} - -void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvStateCommitted::TPtr& ev) { + } +} + +void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvStateCommitted::TPtr& ev) { if (!OnComputeActorEventReceived(ev)) { return; } - const auto& checkpointPb = ev->Get()->Record.GetCheckpoint(); - TCheckpointId checkpointId(checkpointPb.GetGeneration(), checkpointPb.GetId()); + const auto& checkpointPb = ev->Get()->Record.GetCheckpoint(); + TCheckpointId checkpointId(checkpointPb.GetGeneration(), checkpointPb.GetId()); CC_LOG_D("[" << checkpointId << "] Got TEvStateCommitted; task: " << ev->Get()->Record.GetTaskId()); - const auto it = PendingCommitCheckpoints.find(checkpointId); - if (it == PendingCommitCheckpoints.end()) { + const auto it = PendingCommitCheckpoints.find(checkpointId); + if (it == PendingCommitCheckpoints.end()) { CC_LOG_W("[" << checkpointId << "] Got TEvStateCommitted for checkpoint " << checkpointId << " but it is not in PendingCommitCheckpoints"); - return; - } - - auto& checkpoint = it->second; - checkpoint.Acknowledge(ev->Sender); + return; + } + + auto& checkpoint = it->second; + checkpoint.Acknowledge(ev->Sender); CC_LOG_D("[" << checkpointId << "] State committed " << ev->Sender.ToString() << ", need " << checkpoint.NotYetAcknowledgedCount() << " more acks"); - if (checkpoint.GotAllAcknowledges()) { + if (checkpoint.GotAllAcknowledges()) { CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'Completed'"); - const auto& stats = checkpoint.GetStats(); - auto durationMs = (TInstant::Now() - stats.CreatedAt).MilliSeconds(); - Metrics.LastCheckpointBarrierDeliveryTimeMillis->Set(durationMs); - Metrics.CheckpointBarrierDeliveryTimeMillis->Collect(durationMs); - Send(StorageProxy, new TEvCheckpointStorage::TEvCompleteCheckpointRequest(CoordinatorId, checkpointId), IEventHandle::FlagTrackDelivery); - } -} - -void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCompleteCheckpointResponse::TPtr& ev) { - const auto& checkpointId = ev->Get()->CheckpointId; + const auto& stats = checkpoint.GetStats(); + auto durationMs = (TInstant::Now() - stats.CreatedAt).MilliSeconds(); + Metrics.LastCheckpointBarrierDeliveryTimeMillis->Set(durationMs); + Metrics.CheckpointBarrierDeliveryTimeMillis->Collect(durationMs); + Send(StorageProxy, new TEvCheckpointStorage::TEvCompleteCheckpointRequest(CoordinatorId, checkpointId), IEventHandle::FlagTrackDelivery); + } +} + +void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCompleteCheckpointResponse::TPtr& ev) { + const auto& checkpointId = ev->Get()->CheckpointId; CC_LOG_D("[" << checkpointId << "] Got TEvCompleteCheckpointResponse"); - const auto it = PendingCommitCheckpoints.find(checkpointId); - if (it == PendingCommitCheckpoints.end()) { + const auto it = PendingCommitCheckpoints.find(checkpointId); + if (it == PendingCommitCheckpoints.end()) { CC_LOG_W("[" << checkpointId << "] Got TEvCompleteCheckpointResponse but related checkpoint is not in progress; checkpointId: " << checkpointId); - return; - } - const auto& issues = ev->Get()->Issues; - if (!issues) { - const auto& stats = it->second.GetStats(); - auto durationMs = (TInstant::Now() - stats.CreatedAt).MilliSeconds(); - Metrics.LastCheckpointDurationMillis->Set(durationMs); - Metrics.LastCheckpointSizeBytes->Set(stats.StateSize); - Metrics.CheckpointDurationMillis->Collect(durationMs); - Metrics.CheckpointSizeBytes->Collect(stats.StateSize); - ++*Metrics.Completed; + return; + } + const auto& issues = ev->Get()->Issues; + if (!issues) { + const auto& stats = it->second.GetStats(); + auto durationMs = (TInstant::Now() - stats.CreatedAt).MilliSeconds(); + Metrics.LastCheckpointDurationMillis->Set(durationMs); + Metrics.LastCheckpointSizeBytes->Set(stats.StateSize); + Metrics.CheckpointDurationMillis->Collect(durationMs); + Metrics.CheckpointSizeBytes->Collect(stats.StateSize); + ++*Metrics.Completed; CC_LOG_I("[" << checkpointId << "] Checkpoint completed"); - } else { - ++*Metrics.StorageError; + } else { + ++*Metrics.StorageError; CC_LOG_E("[" << checkpointId << "] Can't change checkpoint status to 'Completed': " << issues.ToString()); - } - PendingCommitCheckpoints.erase(it); - UpdateInProgressMetric(); -} - -void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr& ev) { - const auto& checkpointId = ev->Get()->CheckpointId; + } + PendingCommitCheckpoints.erase(it); + UpdateInProgressMetric(); +} + +void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr& ev) { + const auto& checkpointId = ev->Get()->CheckpointId; CC_LOG_D("[" << checkpointId << "] Got TEvAbortCheckpointResponse"); - const auto& issues = ev->Get()->Issues; - if (issues) { + const auto& issues = ev->Get()->Issues; + if (issues) { CC_LOG_E("[" << checkpointId << "] Can't abort checkpoint: " << issues.ToString()); - ++*Metrics.StorageError; - } else { + ++*Metrics.StorageError; + } else { CC_LOG_W("[" << checkpointId << "] Checkpoint aborted"); - ++*Metrics.Aborted; - } - PendingCheckpoints.erase(checkpointId); - PendingCommitCheckpoints.erase(checkpointId); - UpdateInProgressMetric(); -} - + ++*Metrics.Aborted; + } + PendingCheckpoints.erase(checkpointId); + PendingCommitCheckpoints.erase(checkpointId); + UpdateInProgressMetric(); +} + void TCheckpointCoordinator::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) { const auto actorIt = TaskIdToActor.find(ev->Get()->EventQueueId); Y_VERIFY(actorIt != TaskIdToActor.end()); @@ -561,14 +561,14 @@ void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) { PassAway(); } -void TCheckpointCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { - TStringStream message; - message << "Got TEvUndelivered; reason: " << ev->Get()->Reason << ", sourceType: " << ev->Get()->SourceType; +void TCheckpointCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { + TStringStream message; + message << "Got TEvUndelivered; reason: " << ev->Get()->Reason << ", sourceType: " << ev->Get()->SourceType; CC_LOG_D(message.Str()); Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Unavailable(message.Str())); - PassAway(); -} - + PassAway(); +} + void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&) { InitingZeroCheckpoint = false; // TODO: run graph only now, not before zero checkpoint inited diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h index 0bb76685e1..5c820f903d 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h +++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h @@ -1,92 +1,92 @@ -#pragma once - -#include "checkpoint_id_generator.h" -#include "pending_checkpoint.h" - +#pragma once + +#include "checkpoint_id_generator.h" +#include "pending_checkpoint.h" + #include <ydb/core/yq/libs/checkpointing/events/events.h> #include <ydb/core/yq/libs/checkpointing_common/defs.h> #include <ydb/core/yq/libs/checkpoint_storage/events/events.h> - + #include <ydb/core/yq/libs/config/protos/checkpoint_coordinator.pb.h> #include <ydb/public/api/protos/yq.pb.h> - + #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/library/yql/dq/actors/compute/retry_queue.h> #include <ydb/library/yql/providers/dq/actors/events.h> - -#include <library/cpp/actors/core/actor_bootstrapped.h> - + +#include <library/cpp/actors/core/actor_bootstrapped.h> + namespace NYq { - -using namespace NActors; + +using namespace NActors; using namespace NYq::NConfig; - -class TCheckpointCoordinator : public TActorBootstrapped<TCheckpointCoordinator> { -public: - TCheckpointCoordinator(TCoordinatorId coordinatorId, - const TActorId& taskControllerId, + +class TCheckpointCoordinator : public TActorBootstrapped<TCheckpointCoordinator> { +public: + TCheckpointCoordinator(TCoordinatorId coordinatorId, + const TActorId& taskControllerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, - const NMonitoring::TDynamicCounterPtr& counters, + const NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode, const YandexQuery::StreamingDisposition& streamingDisposition); - + void Handle(const NYql::NDqs::TEvReadyState::TPtr&); - void Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr&); + void Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr&); void Handle(const NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck::TPtr&); - void Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr&); - void Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult::TPtr&); - void Handle(const TEvCheckpointCoordinator::TEvScheduleCheckpointing::TPtr&); - void Handle(const TEvCheckpointStorage::TEvCreateCheckpointResponse::TPtr&); - void Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult::TPtr&); - void Handle(const TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse::TPtr&); - void Handle(const NYql::NDq::TEvDqCompute::TEvStateCommitted::TPtr&); - void Handle(const TEvCheckpointStorage::TEvCompleteCheckpointResponse::TPtr&); - void Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr&); + void Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr&); + void Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult::TPtr&); + void Handle(const TEvCheckpointCoordinator::TEvScheduleCheckpointing::TPtr&); + void Handle(const TEvCheckpointStorage::TEvCreateCheckpointResponse::TPtr&); + void Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult::TPtr&); + void Handle(const TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse::TPtr&); + void Handle(const NYql::NDq::TEvDqCompute::TEvStateCommitted::TPtr&); + void Handle(const TEvCheckpointStorage::TEvCompleteCheckpointResponse::TPtr&); + void Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr&); void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev); void Handle(NActors::TEvents::TEvPoison::TPtr&); - void Handle(NActors::TEvents::TEvUndelivered::TPtr&); + void Handle(NActors::TEvents::TEvUndelivered::TPtr&); void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev); void Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&); - - - STRICT_STFUNC(DispatchEvent, + + + STRICT_STFUNC(DispatchEvent, hFunc(NYql::NDqs::TEvReadyState, Handle) - hFunc(TEvCheckpointStorage::TEvRegisterCoordinatorResponse, Handle) + hFunc(TEvCheckpointStorage::TEvRegisterCoordinatorResponse, Handle) hFunc(NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck, Handle) - hFunc(TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse, Handle) - hFunc(NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult, Handle) - hFunc(TEvCheckpointCoordinator::TEvScheduleCheckpointing, Handle) - hFunc(TEvCheckpointStorage::TEvCreateCheckpointResponse, Handle) - hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult, Handle) - hFunc(TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse, Handle) - hFunc(NYql::NDq::TEvDqCompute::TEvStateCommitted, Handle) - hFunc(TEvCheckpointStorage::TEvCompleteCheckpointResponse, Handle) - hFunc(TEvCheckpointStorage::TEvAbortCheckpointResponse, Handle) + hFunc(TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse, Handle) + hFunc(NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult, Handle) + hFunc(TEvCheckpointCoordinator::TEvScheduleCheckpointing, Handle) + hFunc(TEvCheckpointStorage::TEvCreateCheckpointResponse, Handle) + hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult, Handle) + hFunc(TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse, Handle) + hFunc(NYql::NDq::TEvDqCompute::TEvStateCommitted, Handle) + hFunc(TEvCheckpointStorage::TEvCompleteCheckpointResponse, Handle) + hFunc(TEvCheckpointStorage::TEvAbortCheckpointResponse, Handle) hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle) - hFunc(NActors::TEvents::TEvPoison, Handle) - hFunc(NActors::TEvents::TEvUndelivered, Handle) + hFunc(NActors::TEvents::TEvPoison, Handle) + hFunc(NActors::TEvents::TEvUndelivered, Handle) hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle) hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle) hFunc(TEvCheckpointCoordinator::TEvRunGraph, Handle) - ) - - void Bootstrap(); - + ) + + void Bootstrap(); + static constexpr char ActorName[] = "YQ_CHECKPOINT_COORDINATOR"; -private: - void InitCheckpoint(); +private: + void InitCheckpoint(); void InjectCheckpoint(const TCheckpointId& checkpointId); - void ScheduleNextCheckpoint(); - void UpdateInProgressMetric(); + void ScheduleNextCheckpoint(); + void UpdateInProgressMetric(); void PassAway() override; void RestoreFromOwnCheckpoint(const TCheckpointMetadata& checkpoint); void TryToRestoreOffsetsFromForeignCheckpoint(const TCheckpointMetadata& checkpoint); - + template <class TEvPtr> bool OnComputeActorEventReceived(TEvPtr& ev) { const auto actorIt = AllActors.find(ev->Sender); @@ -94,65 +94,65 @@ private: return actorIt->second->EventsQueue.OnEventReceived(ev); } - struct TCheckpointCoordinatorMetrics { - TCheckpointCoordinatorMetrics(const NMonitoring::TDynamicCounterPtr& counters) { - auto subgroup = counters->GetSubgroup("subsystem", "checkpoint_coordinator"); - InProgress = subgroup->GetCounter("InProgress"); - Pending = subgroup->GetCounter("Pending"); - PendingCommit = subgroup->GetCounter("PendingCommit"); - Completed = subgroup->GetCounter("CompletedCheckpoints", true); - Aborted = subgroup->GetCounter("AbortedCheckpoints", true); - StorageError = subgroup->GetCounter("StorageError", true); - FailedToCreate = subgroup->GetCounter("FailedToCreate", true); - Total = subgroup->GetCounter("TotalCheckpoints", true); - LastCheckpointBarrierDeliveryTimeMillis = subgroup->GetCounter("LastCheckpointBarrierDeliveryTimeMillis"); - LastCheckpointDurationMillis = subgroup->GetCounter("LastSuccessfulCheckpointDurationMillis"); - LastCheckpointSizeBytes = subgroup->GetCounter("LastSuccessfulCheckpointSizeBytes"); - CheckpointBarrierDeliveryTimeMillis = subgroup->GetHistogram("CheckpointBarrierDeliveryTimeMillis", NMonitoring::ExponentialHistogram(12, 2, 1024)); // ~ 1s -> ~ 1 h - CheckpointDurationMillis = subgroup->GetHistogram("CheckpointDurationMillis", NMonitoring::ExponentialHistogram(12, 2, 1024)); // ~ 1s -> ~ 1 h - CheckpointSizeBytes = subgroup->GetHistogram("CheckpointSizeBytes", NMonitoring::ExponentialHistogram(8, 32, 32)); // 32b -> 1Tb - SkippedDueToInFlightLimit = subgroup->GetCounter("SkippedDueToInFlightLimit"); + struct TCheckpointCoordinatorMetrics { + TCheckpointCoordinatorMetrics(const NMonitoring::TDynamicCounterPtr& counters) { + auto subgroup = counters->GetSubgroup("subsystem", "checkpoint_coordinator"); + InProgress = subgroup->GetCounter("InProgress"); + Pending = subgroup->GetCounter("Pending"); + PendingCommit = subgroup->GetCounter("PendingCommit"); + Completed = subgroup->GetCounter("CompletedCheckpoints", true); + Aborted = subgroup->GetCounter("AbortedCheckpoints", true); + StorageError = subgroup->GetCounter("StorageError", true); + FailedToCreate = subgroup->GetCounter("FailedToCreate", true); + Total = subgroup->GetCounter("TotalCheckpoints", true); + LastCheckpointBarrierDeliveryTimeMillis = subgroup->GetCounter("LastCheckpointBarrierDeliveryTimeMillis"); + LastCheckpointDurationMillis = subgroup->GetCounter("LastSuccessfulCheckpointDurationMillis"); + LastCheckpointSizeBytes = subgroup->GetCounter("LastSuccessfulCheckpointSizeBytes"); + CheckpointBarrierDeliveryTimeMillis = subgroup->GetHistogram("CheckpointBarrierDeliveryTimeMillis", NMonitoring::ExponentialHistogram(12, 2, 1024)); // ~ 1s -> ~ 1 h + CheckpointDurationMillis = subgroup->GetHistogram("CheckpointDurationMillis", NMonitoring::ExponentialHistogram(12, 2, 1024)); // ~ 1s -> ~ 1 h + CheckpointSizeBytes = subgroup->GetHistogram("CheckpointSizeBytes", NMonitoring::ExponentialHistogram(8, 32, 32)); // 32b -> 1Tb + SkippedDueToInFlightLimit = subgroup->GetCounter("SkippedDueToInFlightLimit"); RestoredFromSavedCheckpoint = subgroup->GetCounter("RestoredFromSavedCheckpoint", true); StartedFromEmptyCheckpoint = subgroup->GetCounter("StartedFromEmptyCheckpoint", true); RestoredStreamingOffsetsFromCheckpoint = subgroup->GetCounter("RestoredStreamingOffsetsFromCheckpoint", true); - } - - NMonitoring::TDynamicCounters::TCounterPtr InProgress; - NMonitoring::TDynamicCounters::TCounterPtr Pending; - NMonitoring::TDynamicCounters::TCounterPtr PendingCommit; - NMonitoring::TDynamicCounters::TCounterPtr Completed; - NMonitoring::TDynamicCounters::TCounterPtr Aborted; - NMonitoring::TDynamicCounters::TCounterPtr StorageError; - NMonitoring::TDynamicCounters::TCounterPtr FailedToCreate; - NMonitoring::TDynamicCounters::TCounterPtr Total; - NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointBarrierDeliveryTimeMillis; - NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointDurationMillis; - NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointSizeBytes; - NMonitoring::TDynamicCounters::TCounterPtr SkippedDueToInFlightLimit; + } + + NMonitoring::TDynamicCounters::TCounterPtr InProgress; + NMonitoring::TDynamicCounters::TCounterPtr Pending; + NMonitoring::TDynamicCounters::TCounterPtr PendingCommit; + NMonitoring::TDynamicCounters::TCounterPtr Completed; + NMonitoring::TDynamicCounters::TCounterPtr Aborted; + NMonitoring::TDynamicCounters::TCounterPtr StorageError; + NMonitoring::TDynamicCounters::TCounterPtr FailedToCreate; + NMonitoring::TDynamicCounters::TCounterPtr Total; + NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointBarrierDeliveryTimeMillis; + NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointDurationMillis; + NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointSizeBytes; + NMonitoring::TDynamicCounters::TCounterPtr SkippedDueToInFlightLimit; NMonitoring::TDynamicCounters::TCounterPtr RestoredFromSavedCheckpoint; NMonitoring::TDynamicCounters::TCounterPtr StartedFromEmptyCheckpoint; NMonitoring::TDynamicCounters::TCounterPtr RestoredStreamingOffsetsFromCheckpoint; - NMonitoring::THistogramPtr CheckpointBarrierDeliveryTimeMillis; - NMonitoring::THistogramPtr CheckpointDurationMillis; - NMonitoring::THistogramPtr CheckpointSizeBytes; - }; - + NMonitoring::THistogramPtr CheckpointBarrierDeliveryTimeMillis; + NMonitoring::THistogramPtr CheckpointDurationMillis; + NMonitoring::THistogramPtr CheckpointSizeBytes; + }; + struct TComputeActorTransportStuff : public TSimpleRefCount<TComputeActorTransportStuff> { using TPtr = TIntrusivePtr<TComputeActorTransportStuff>; NYql::NDq::TRetryEventsQueue EventsQueue; }; - const TCoordinatorId CoordinatorId; - const TActorId TaskControllerId; - const TActorId StorageProxy; + const TCoordinatorId CoordinatorId; + const TActorId TaskControllerId; + const TActorId StorageProxy; const TActorId RunActorId; - std::unique_ptr<TCheckpointIdGenerator> CheckpointIdGenerator; + std::unique_ptr<TCheckpointIdGenerator> CheckpointIdGenerator; TCheckpointCoordinatorConfig Settings; - const TDuration CheckpointingPeriod; + const TDuration CheckpointingPeriod; const NProto::TGraphParams GraphParams; TString GraphDescId; - + THashMap<TActorId, TComputeActorTransportStuff::TPtr> AllActors; THashSet<TActorId> AllActorsSet; THashMap<TActorId, TComputeActorTransportStuff::TPtr> ActorsToTrigger; @@ -161,20 +161,20 @@ private: THashMap<TActorId, TComputeActorTransportStuff::TPtr> ActorsToNotify; THashSet<TActorId> ActorsToNotifySet; THashMap<ui64, TActorId> TaskIdToActor; // Task id -> actor. - THashMap<TCheckpointId, TPendingCheckpoint, TCheckpointIdHash> PendingCheckpoints; - THashMap<TCheckpointId, TPendingCheckpoint, TCheckpointIdHash> PendingCommitCheckpoints; - TMaybe<TPendingRestoreCheckpoint> PendingRestoreCheckpoint; + THashMap<TCheckpointId, TPendingCheckpoint, TCheckpointIdHash> PendingCheckpoints; + THashMap<TCheckpointId, TPendingCheckpoint, TCheckpointIdHash> PendingCommitCheckpoints; + TMaybe<TPendingRestoreCheckpoint> PendingRestoreCheckpoint; std::unique_ptr<TPendingInitCoordinator> PendingInit; - bool GraphIsRunning = false; + bool GraphIsRunning = false; bool InitingZeroCheckpoint = false; bool RestoringFromForeignCheckpoint = false; - - TCheckpointCoordinatorMetrics Metrics; - - YandexQuery::StateLoadMode StateLoadMode; + + TCheckpointCoordinatorMetrics Metrics; + + YandexQuery::StateLoadMode StateLoadMode; YandexQuery::StreamingDisposition StreamingDisposition; -}; - +}; + THolder<NActors::IActor> MakeCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& executerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode = YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT, const YandexQuery::StreamingDisposition& streamingDisposition = {}); } // namespace NYq diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.cpp index 6b415e8ac8..301ea0ac98 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.cpp +++ b/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.cpp @@ -1,23 +1,23 @@ -#include "checkpoint_id_generator.h" +#include "checkpoint_id_generator.h" #include <ydb/core/yq/libs/checkpointing_common/defs.h> namespace NYq { - -TCheckpointIdGenerator::TCheckpointIdGenerator(TCoordinatorId coordinatorId, TCheckpointId lastCheckpoint) - : CoordinatorId(std::move(coordinatorId)) { - if (CoordinatorId.Generation > lastCheckpoint.CoordinatorGeneration) { - NextNumber = 1; - } else if (CoordinatorId.Generation == lastCheckpoint.CoordinatorGeneration) { - NextNumber = lastCheckpoint.SeqNo + 1; - } else { - ythrow yexception() << "Unexpected CheckpointCoordinator generation: " << CoordinatorId.Generation - << " while last checkpoint has " << lastCheckpoint.CoordinatorGeneration; - } -} - + +TCheckpointIdGenerator::TCheckpointIdGenerator(TCoordinatorId coordinatorId, TCheckpointId lastCheckpoint) + : CoordinatorId(std::move(coordinatorId)) { + if (CoordinatorId.Generation > lastCheckpoint.CoordinatorGeneration) { + NextNumber = 1; + } else if (CoordinatorId.Generation == lastCheckpoint.CoordinatorGeneration) { + NextNumber = lastCheckpoint.SeqNo + 1; + } else { + ythrow yexception() << "Unexpected CheckpointCoordinator generation: " << CoordinatorId.Generation + << " while last checkpoint has " << lastCheckpoint.CoordinatorGeneration; + } +} + TCheckpointId NYq::TCheckpointIdGenerator::NextId() { return TCheckpointId(CoordinatorId.Generation, NextNumber++); -} - +} + } // namespace NYq diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.h b/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.h index bcc295cf33..d902553c79 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.h +++ b/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.h @@ -1,18 +1,18 @@ -#pragma once - +#pragma once + #include <ydb/core/yq/libs/checkpointing_common/defs.h> - + namespace NYq { - -class TCheckpointIdGenerator { -private: + +class TCheckpointIdGenerator { +private: TCoordinatorId CoordinatorId; - ui64 NextNumber; - -public: - explicit TCheckpointIdGenerator(TCoordinatorId id, TCheckpointId lastCheckpoint = TCheckpointId(0, 0)); - + ui64 NextNumber; + +public: + explicit TCheckpointIdGenerator(TCoordinatorId id, TCheckpointId lastCheckpoint = TCheckpointId(0, 0)); + TCheckpointId NextId(); -}; - +}; + } // namespace NYq diff --git a/ydb/core/yq/libs/checkpointing/pending_checkpoint.cpp b/ydb/core/yq/libs/checkpointing/pending_checkpoint.cpp index 01c0204e46..888580c210 100644 --- a/ydb/core/yq/libs/checkpointing/pending_checkpoint.cpp +++ b/ydb/core/yq/libs/checkpointing/pending_checkpoint.cpp @@ -1,51 +1,51 @@ -#include "pending_checkpoint.h" - +#include "pending_checkpoint.h" + namespace NYq { - -TPendingCheckpoint::TPendingCheckpoint(THashSet<NActors::TActorId> toBeAcknowledged, TPendingCheckpointStats stats) - : NotYetAcknowledged(std::move(toBeAcknowledged)) - , Stats(std::move(stats)) { -} - -void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId) { - Acknowledge(actorId, 0); -} - -void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId, ui64 stateSize) { - NotYetAcknowledged.erase(actorId); - Stats.StateSize += stateSize; -} - -bool TPendingCheckpoint::GotAllAcknowledges() const { - return NotYetAcknowledged.empty(); -} - -size_t TPendingCheckpoint::NotYetAcknowledgedCount() const { - return NotYetAcknowledged.size(); -} - -const TPendingCheckpointStats& TPendingCheckpoint::GetStats() const { - return Stats; -} - -TPendingRestoreCheckpoint::TPendingRestoreCheckpoint(TCheckpointId checkpointId, bool commitAfterRestore, THashSet<NActors::TActorId> toBeAcknowledged) - : CheckpointId(checkpointId) - , CommitAfterRestore(commitAfterRestore) - , NotYetAcknowledged(std::move(toBeAcknowledged)) { -} - -void TPendingRestoreCheckpoint::Acknowledge(const NActors::TActorId& actorId) { - NotYetAcknowledged.erase(actorId); -} - -bool TPendingRestoreCheckpoint::GotAllAcknowledges() const { - return NotYetAcknowledged.empty(); -} - -size_t TPendingRestoreCheckpoint::NotYetAcknowledgedCount() const { - return NotYetAcknowledged.size(); -} - + +TPendingCheckpoint::TPendingCheckpoint(THashSet<NActors::TActorId> toBeAcknowledged, TPendingCheckpointStats stats) + : NotYetAcknowledged(std::move(toBeAcknowledged)) + , Stats(std::move(stats)) { +} + +void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId) { + Acknowledge(actorId, 0); +} + +void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId, ui64 stateSize) { + NotYetAcknowledged.erase(actorId); + Stats.StateSize += stateSize; +} + +bool TPendingCheckpoint::GotAllAcknowledges() const { + return NotYetAcknowledged.empty(); +} + +size_t TPendingCheckpoint::NotYetAcknowledgedCount() const { + return NotYetAcknowledged.size(); +} + +const TPendingCheckpointStats& TPendingCheckpoint::GetStats() const { + return Stats; +} + +TPendingRestoreCheckpoint::TPendingRestoreCheckpoint(TCheckpointId checkpointId, bool commitAfterRestore, THashSet<NActors::TActorId> toBeAcknowledged) + : CheckpointId(checkpointId) + , CommitAfterRestore(commitAfterRestore) + , NotYetAcknowledged(std::move(toBeAcknowledged)) { +} + +void TPendingRestoreCheckpoint::Acknowledge(const NActors::TActorId& actorId) { + NotYetAcknowledged.erase(actorId); +} + +bool TPendingRestoreCheckpoint::GotAllAcknowledges() const { + return NotYetAcknowledged.empty(); +} + +size_t TPendingRestoreCheckpoint::NotYetAcknowledgedCount() const { + return NotYetAcknowledged.size(); +} + void TPendingInitCoordinator::OnNewCheckpointCoordinatorAck() { ++NewCheckpointCoordinatorAcksGot; Y_VERIFY(NewCheckpointCoordinatorAcksGot <= ActorsCount); diff --git a/ydb/core/yq/libs/checkpointing/pending_checkpoint.h b/ydb/core/yq/libs/checkpointing/pending_checkpoint.h index bc42b48018..3ce3ec1431 100644 --- a/ydb/core/yq/libs/checkpointing/pending_checkpoint.h +++ b/ydb/core/yq/libs/checkpointing/pending_checkpoint.h @@ -1,56 +1,56 @@ #pragma once #include <ydb/core/yq/libs/checkpointing_common/defs.h> - -#include <library/cpp/actors/core/actor.h> - + +#include <library/cpp/actors/core/actor.h> + namespace NYq { - -struct TPendingCheckpointStats { - const TInstant CreatedAt = TInstant::Now(); - ui64 StateSize = 0; -}; - -class TPendingCheckpoint { - THashSet<NActors::TActorId> NotYetAcknowledged; - TPendingCheckpointStats Stats; - -public: - explicit TPendingCheckpoint(THashSet<NActors::TActorId> toBeAcknowledged, TPendingCheckpointStats stats = TPendingCheckpointStats()); - - void Acknowledge(const NActors::TActorId& actorId); - - void Acknowledge(const NActors::TActorId& actorId, ui64 stateSize); - - [[nodiscard]] - bool GotAllAcknowledges() const; - - [[nodiscard]] - size_t NotYetAcknowledgedCount() const; - - [[nodiscard]] - const TPendingCheckpointStats& GetStats() const; -}; - -class TPendingRestoreCheckpoint { -public: - TPendingRestoreCheckpoint(TCheckpointId checkpointId, bool commitAfterRestore, THashSet<NActors::TActorId> toBeAcknowledged); - - void Acknowledge(const NActors::TActorId& actorId); - - [[nodiscard]] - bool GotAllAcknowledges() const; - - [[nodiscard]] - size_t NotYetAcknowledgedCount() const; - -public: - TCheckpointId CheckpointId; - bool CommitAfterRestore; - -private: - THashSet<NActors::TActorId> NotYetAcknowledged; -}; - + +struct TPendingCheckpointStats { + const TInstant CreatedAt = TInstant::Now(); + ui64 StateSize = 0; +}; + +class TPendingCheckpoint { + THashSet<NActors::TActorId> NotYetAcknowledged; + TPendingCheckpointStats Stats; + +public: + explicit TPendingCheckpoint(THashSet<NActors::TActorId> toBeAcknowledged, TPendingCheckpointStats stats = TPendingCheckpointStats()); + + void Acknowledge(const NActors::TActorId& actorId); + + void Acknowledge(const NActors::TActorId& actorId, ui64 stateSize); + + [[nodiscard]] + bool GotAllAcknowledges() const; + + [[nodiscard]] + size_t NotYetAcknowledgedCount() const; + + [[nodiscard]] + const TPendingCheckpointStats& GetStats() const; +}; + +class TPendingRestoreCheckpoint { +public: + TPendingRestoreCheckpoint(TCheckpointId checkpointId, bool commitAfterRestore, THashSet<NActors::TActorId> toBeAcknowledged); + + void Acknowledge(const NActors::TActorId& actorId); + + [[nodiscard]] + bool GotAllAcknowledges() const; + + [[nodiscard]] + size_t NotYetAcknowledgedCount() const; + +public: + TCheckpointId CheckpointId; + bool CommitAfterRestore; + +private: + THashSet<NActors::TActorId> NotYetAcknowledged; +}; + class TPendingInitCoordinator { public: explicit TPendingInitCoordinator(size_t actorsCount) diff --git a/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp index ceb0a9faf5..77deb2ff61 100644 --- a/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp +++ b/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp @@ -2,226 +2,226 @@ #include <ydb/core/yq/libs/graph_params/proto/graph_params.pb.h> #include <ydb/core/testlib/actors/test_runtime.h> #include <ydb/core/testlib/basics/helpers.h> - -#include <library/cpp/testing/unittest/registar.h> -#include <library/cpp/actors/core/executor_pool_basic.h> -#include <library/cpp/actors/core/scheduler_basic.h> - -namespace { - -using namespace NKikimr; + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/scheduler_basic.h> + +namespace { + +using namespace NKikimr; using namespace NYq; - + enum ETestGraphFlags : ui64 { InputWithSource = 1, SourceWithChannelInOneTask = 2, }; NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) { - + NYql::NDqProto::TReadyState result; - auto* ingress = result.AddTask(); - ingress->SetId(1); - auto* ingressOutput = ingress->AddOutputs(); - ingressOutput->AddChannels(); + auto* ingress = result.AddTask(); + ingress->SetId(1); + auto* ingressOutput = ingress->AddOutputs(); + ingressOutput->AddChannels(); if (flags & ETestGraphFlags::InputWithSource) { - auto* source = ingress->AddInputs()->MutableSource(); - source->SetType("PqSource"); + auto* source = ingress->AddInputs()->MutableSource(); + source->SetType("PqSource"); } - - auto* map = result.AddTask(); - map->SetId(2); - auto* mapInput = map->AddInputs(); - mapInput->AddChannels(); - auto* mapOutput = map->AddOutputs(); - mapOutput->AddChannels(); + + auto* map = result.AddTask(); + map->SetId(2); + auto* mapInput = map->AddInputs(); + mapInput->AddChannels(); + auto* mapOutput = map->AddOutputs(); + mapOutput->AddChannels(); if (flags & ETestGraphFlags::SourceWithChannelInOneTask) { - auto* source = map->AddInputs()->MutableSource(); - source->SetType("PqSource"); + auto* source = map->AddInputs()->MutableSource(); + source->SetType("PqSource"); } - - auto* egress = result.AddTask(); - egress->SetId(3); - auto* egressInput = egress->AddInputs(); - egressInput->AddChannels(); - - return result; -} - -struct TTestBootstrap : public TTestActorRuntime { + + auto* egress = result.AddTask(); + egress->SetId(3); + auto* egressInput = egress->AddInputs(); + egressInput->AddChannels(); + + return result; +} + +struct TTestBootstrap : public TTestActorRuntime { NYql::NDqProto::TReadyState GraphState; - NConfig::TCheckpointCoordinatorConfig Settings; - NActors::TActorId StorageProxy; - NActors::TActorId CheckpointCoordinator; + NConfig::TCheckpointCoordinatorConfig Settings; + NActors::TActorId StorageProxy; + NActors::TActorId CheckpointCoordinator; NActors::TActorId RunActor; - - NActors::TActorId IngressActor; - NActors::TActorId MapActor; - NActors::TActorId EgressActor; - - THashMap<TActorId, ui64> ActorToTask; - - NMonitoring::TDynamicCounterPtr Counters = new NMonitoring::TDynamicCounters(); - + + NActors::TActorId IngressActor; + NActors::TActorId MapActor; + NActors::TActorId EgressActor; + + THashMap<TActorId, ui64> ActorToTask; + + NMonitoring::TDynamicCounterPtr Counters = new NMonitoring::TDynamicCounters(); + explicit TTestBootstrap(ui64 graphFlags = 0) : TTestActorRuntime(true) , GraphState(BuildTestGraph(graphFlags)) { - TAutoPtr<TAppPrepare> app = new TAppPrepare(); - Initialize(app->Unwrap()); - StorageProxy = AllocateEdgeActor(); + TAutoPtr<TAppPrepare> app = new TAppPrepare(); + Initialize(app->Unwrap()); + StorageProxy = AllocateEdgeActor(); RunActor = AllocateEdgeActor(); - IngressActor = AllocateEdgeActor(); - MapActor = AllocateEdgeActor(); - EgressActor = AllocateEdgeActor(); - + IngressActor = AllocateEdgeActor(); + MapActor = AllocateEdgeActor(); + EgressActor = AllocateEdgeActor(); + ActorIdToProto(IngressActor, GraphState.AddActorId()); ActorIdToProto(MapActor, GraphState.AddActorId()); ActorIdToProto(EgressActor, GraphState.AddActorId()); - + ActorToTask[IngressActor] = GraphState.GetTask()[0].GetId(); ActorToTask[MapActor] = GraphState.GetTask()[1].GetId(); ActorToTask[EgressActor] = GraphState.GetTask()[2].GetId(); - - Settings = NConfig::TCheckpointCoordinatorConfig(); + + Settings = NConfig::TCheckpointCoordinatorConfig(); Settings.SetEnabled(true); - Settings.SetCheckpointingPeriodMillis(TDuration::Hours(1).MilliSeconds()); - Settings.SetMaxInflight(1); - - SetLogPriority(NKikimrServices::STREAMS_CHECKPOINT_COORDINATOR, NLog::PRI_DEBUG); - + Settings.SetCheckpointingPeriodMillis(TDuration::Hours(1).MilliSeconds()); + Settings.SetMaxInflight(1); + + SetLogPriority(NKikimrServices::STREAMS_CHECKPOINT_COORDINATOR, NLog::PRI_DEBUG); + CheckpointCoordinator = Register(MakeCheckpointCoordinator(TCoordinatorId("my-graph-id", 42), {}, StorageProxy, RunActor, Settings, Counters, NProto::TGraphParams()).Release()); WaitForBootstrap(); Send(new IEventHandle(CheckpointCoordinator, {}, new NYql::NDqs::TEvReadyState(std::move(GraphState)))); - EnableScheduleForActor(CheckpointCoordinator); - } + EnableScheduleForActor(CheckpointCoordinator); + } void WaitForBootstrap() { NActors::TDispatchOptions options; options.FinalEvents.emplace_back(NActors::TEvents::TSystem::Bootstrap, 1); DispatchEvents(options); } -}; -} // namespace - +}; +} // namespace + namespace NYq { - -void MockRegisterCoordinatorResponseEvent(TTestBootstrap& bootstrap, NYql::TIssues issues = NYql::TIssues()) { - bootstrap.Send(new IEventHandle( - bootstrap.CheckpointCoordinator, - bootstrap.StorageProxy, + +void MockRegisterCoordinatorResponseEvent(TTestBootstrap& bootstrap, NYql::TIssues issues = NYql::TIssues()) { + bootstrap.Send(new IEventHandle( + bootstrap.CheckpointCoordinator, + bootstrap.StorageProxy, new TEvCheckpointStorage::TEvRegisterCoordinatorResponse(std::move(issues)))); -} - -void MockCheckpointsMetadataResponse(TTestBootstrap& bootstrap, NYql::TIssues issues = NYql::TIssues()) { - bootstrap.Send(new IEventHandle( - bootstrap.CheckpointCoordinator, - bootstrap.StorageProxy, - new TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse(TVector<TCheckpointMetadata>(), std::move(issues)))); -} - +} + +void MockCheckpointsMetadataResponse(TTestBootstrap& bootstrap, NYql::TIssues issues = NYql::TIssues()) { + bootstrap.Send(new IEventHandle( + bootstrap.CheckpointCoordinator, + bootstrap.StorageProxy, + new TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse(TVector<TCheckpointMetadata>(), std::move(issues)))); +} + void MockCreateCheckpointResponse(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, NYql::TIssues issues = NYql::TIssues()) { - bootstrap.Send(new IEventHandle( - bootstrap.CheckpointCoordinator, - bootstrap.StorageProxy, + bootstrap.Send(new IEventHandle( + bootstrap.CheckpointCoordinator, + bootstrap.StorageProxy, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), "42"))); -} - +} + void MockNodeStateSavedEvent(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, TActorId& sender) { - auto ev = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>(); - ev->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration); - ev->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo); - ev->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK); - bootstrap.Send(new IEventHandle( - bootstrap.CheckpointCoordinator, - sender, - ev.release())); -} - + auto ev = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>(); + ev->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration); + ev->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo); + ev->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK); + bootstrap.Send(new IEventHandle( + bootstrap.CheckpointCoordinator, + sender, + ev.release())); +} + void MockNodeStateSaveFailedEvent(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, TActorId& sender) { - auto ev = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>(); - ev->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration); - ev->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo); - ev->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR); - bootstrap.Send(new IEventHandle( - bootstrap.CheckpointCoordinator, - sender, - ev.release())); -} - + auto ev = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>(); + ev->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration); + ev->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo); + ev->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR); + bootstrap.Send(new IEventHandle( + bootstrap.CheckpointCoordinator, + sender, + ev.release())); +} + void MockSetCheckpointPendingCommitStatusResponse(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, NYql::TIssues issues = NYql::TIssues()) { - bootstrap.Send(new IEventHandle( - bootstrap.CheckpointCoordinator, - bootstrap.StorageProxy, + bootstrap.Send(new IEventHandle( + bootstrap.CheckpointCoordinator, + bootstrap.StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse(checkpointId, std::move(issues)))); -} - +} + void MockChangesCommittedEvent(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, TActorId& sender) { - bootstrap.Send(new IEventHandle( - bootstrap.CheckpointCoordinator, - sender, - new NYql::NDq::TEvDqCompute::TEvStateCommitted(checkpointId.SeqNo, checkpointId.CoordinatorGeneration, bootstrap.ActorToTask[sender]))); -} - + bootstrap.Send(new IEventHandle( + bootstrap.CheckpointCoordinator, + sender, + new NYql::NDq::TEvDqCompute::TEvStateCommitted(checkpointId.SeqNo, checkpointId.CoordinatorGeneration, bootstrap.ActorToTask[sender]))); +} + void MockCompleteCheckpointResponse(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, NYql::TIssues issues = NYql::TIssues()) { - bootstrap.Send(new IEventHandle( - bootstrap.CheckpointCoordinator, - bootstrap.StorageProxy, + bootstrap.Send(new IEventHandle( + bootstrap.CheckpointCoordinator, + bootstrap.StorageProxy, new TEvCheckpointStorage::TEvCompleteCheckpointResponse(checkpointId, std::move(issues)))); -} - -Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { +} + +Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { void ShouldTriggerCheckpointImpl(ui64 graphFlags) { TTestBootstrap bootstrap(graphFlags); - - Cerr << "Waiting for TEvRegisterCoordinatorRequest (storage)" << Endl; + + Cerr << "Waiting for TEvRegisterCoordinatorRequest (storage)" << Endl; bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvRegisterCoordinatorRequest>( - bootstrap.StorageProxy, TDuration::Seconds(10)); - MockRegisterCoordinatorResponseEvent(bootstrap); - - Cerr << "Waiting for TEvGetCheckpointsMetadataRequest (storage)" << Endl; - bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest>( - bootstrap.StorageProxy, TDuration::Seconds(10)); - MockCheckpointsMetadataResponse(bootstrap); - - Cerr << "Waiting for TEvCreateCheckpointRequest (storage)" << Endl; + bootstrap.StorageProxy, TDuration::Seconds(10)); + MockRegisterCoordinatorResponseEvent(bootstrap); + + Cerr << "Waiting for TEvGetCheckpointsMetadataRequest (storage)" << Endl; + bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest>( + bootstrap.StorageProxy, TDuration::Seconds(10)); + MockCheckpointsMetadataResponse(bootstrap); + + Cerr << "Waiting for TEvCreateCheckpointRequest (storage)" << Endl; auto updateState = bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvCreateCheckpointRequest>( - bootstrap.StorageProxy, TDuration::Seconds(10)); - - auto& checkpointId = updateState->Get()->CheckpointId; - MockCreateCheckpointResponse(bootstrap, checkpointId); - - Cerr << "Waiting for TEvInjectCheckpointBarrier (ingress)" << Endl; - bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvInjectCheckpoint>( - bootstrap.IngressActor, TDuration::Seconds(10)); - - MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.IngressActor); - MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.MapActor); - MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.EgressActor); - - Cerr << "Waiting for TEvSetCheckpointPendingCommitStatusRequest (storage)" << Endl; + bootstrap.StorageProxy, TDuration::Seconds(10)); + + auto& checkpointId = updateState->Get()->CheckpointId; + MockCreateCheckpointResponse(bootstrap, checkpointId); + + Cerr << "Waiting for TEvInjectCheckpointBarrier (ingress)" << Endl; + bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvInjectCheckpoint>( + bootstrap.IngressActor, TDuration::Seconds(10)); + + MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.IngressActor); + MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.MapActor); + MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.EgressActor); + + Cerr << "Waiting for TEvSetCheckpointPendingCommitStatusRequest (storage)" << Endl; bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest>( - bootstrap.StorageProxy, TDuration::Seconds(10)); - - MockSetCheckpointPendingCommitStatusResponse(bootstrap, checkpointId); - Cerr << "Waiting for TEvCommitChanges (ingress)" << Endl; - bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvCommitState>(bootstrap.IngressActor, TDuration::Seconds(10)); - Cerr << "Waiting for TEvCommitChanges (egress)" << Endl; - bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvCommitState>(bootstrap.EgressActor, TDuration::Seconds(10)); - - MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.IngressActor); - MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.MapActor); - MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.EgressActor); - - Cerr << "Waiting for TEvCompleteCheckpointRequest (storage)" << Endl; + bootstrap.StorageProxy, TDuration::Seconds(10)); + + MockSetCheckpointPendingCommitStatusResponse(bootstrap, checkpointId); + Cerr << "Waiting for TEvCommitChanges (ingress)" << Endl; + bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvCommitState>(bootstrap.IngressActor, TDuration::Seconds(10)); + Cerr << "Waiting for TEvCommitChanges (egress)" << Endl; + bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvCommitState>(bootstrap.EgressActor, TDuration::Seconds(10)); + + MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.IngressActor); + MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.MapActor); + MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.EgressActor); + + Cerr << "Waiting for TEvCompleteCheckpointRequest (storage)" << Endl; auto completed = bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvCompleteCheckpointRequest>( - bootstrap.StorageProxy, TDuration::Seconds(10)); - UNIT_ASSERT(completed.Get() != nullptr); - MockCompleteCheckpointResponse(bootstrap, checkpointId); - } - + bootstrap.StorageProxy, TDuration::Seconds(10)); + UNIT_ASSERT(completed.Get() != nullptr); + MockCompleteCheckpointResponse(bootstrap, checkpointId); + } + Y_UNIT_TEST(ShouldTriggerCheckpoint) { ShouldTriggerCheckpointImpl(0); } @@ -238,39 +238,39 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { ShouldTriggerCheckpointImpl(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask); } - Y_UNIT_TEST(ShouldAbortPreviousCheckpointsIfNodeStateCantBeSaved) { - TTestBootstrap bootstrap{ETestGraphFlags::InputWithSource}; - - Cerr << "Waiting for TEvRegisterCoordinatorRequest (storage)" << Endl; + Y_UNIT_TEST(ShouldAbortPreviousCheckpointsIfNodeStateCantBeSaved) { + TTestBootstrap bootstrap{ETestGraphFlags::InputWithSource}; + + Cerr << "Waiting for TEvRegisterCoordinatorRequest (storage)" << Endl; bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvRegisterCoordinatorRequest>( - bootstrap.StorageProxy, TDuration::Seconds(10)); - MockRegisterCoordinatorResponseEvent(bootstrap); - - Cerr << "Waiting for TEvGetCheckpointsMetadataRequest (storage)" << Endl; - bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest>( - bootstrap.StorageProxy, TDuration::Seconds(10)); - MockCheckpointsMetadataResponse(bootstrap); - - Cerr << "Waiting for TEvCreateCheckpointRequest (storage)" << Endl; + bootstrap.StorageProxy, TDuration::Seconds(10)); + MockRegisterCoordinatorResponseEvent(bootstrap); + + Cerr << "Waiting for TEvGetCheckpointsMetadataRequest (storage)" << Endl; + bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest>( + bootstrap.StorageProxy, TDuration::Seconds(10)); + MockCheckpointsMetadataResponse(bootstrap); + + Cerr << "Waiting for TEvCreateCheckpointRequest (storage)" << Endl; auto updateState = bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvCreateCheckpointRequest>( - bootstrap.StorageProxy, TDuration::Seconds(10)); - UNIT_ASSERT(updateState->Get()->NodeCount == 3); - - auto& checkpointId = updateState->Get()->CheckpointId; - MockCreateCheckpointResponse(bootstrap, checkpointId); - - Cerr << "Waiting for TEvInjectCheckpointBarrier (ingress)" << Endl; - bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvInjectCheckpoint>(bootstrap.IngressActor, TDuration::Seconds(10)); - - MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.IngressActor); - MockNodeStateSaveFailedEvent(bootstrap, checkpointId, bootstrap.MapActor); - MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.EgressActor); - - Cerr << "Waiting for TEvCompleteCheckpointRequest (storage)" << Endl; + bootstrap.StorageProxy, TDuration::Seconds(10)); + UNIT_ASSERT(updateState->Get()->NodeCount == 3); + + auto& checkpointId = updateState->Get()->CheckpointId; + MockCreateCheckpointResponse(bootstrap, checkpointId); + + Cerr << "Waiting for TEvInjectCheckpointBarrier (ingress)" << Endl; + bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvInjectCheckpoint>(bootstrap.IngressActor, TDuration::Seconds(10)); + + MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.IngressActor); + MockNodeStateSaveFailedEvent(bootstrap, checkpointId, bootstrap.MapActor); + MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.EgressActor); + + Cerr << "Waiting for TEvCompleteCheckpointRequest (storage)" << Endl; auto completed = bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvAbortCheckpointRequest>( - bootstrap.StorageProxy, TDuration::Seconds(10)); - UNIT_ASSERT(completed.Get() != nullptr); - } -} - + bootstrap.StorageProxy, TDuration::Seconds(10)); + UNIT_ASSERT(completed.Get() != nullptr); + } +} + } // namespace NYq diff --git a/ydb/core/yq/libs/checkpointing/ut/ya.make b/ydb/core/yq/libs/checkpointing/ut/ya.make index cf9700d584..f9a3b54437 100644 --- a/ydb/core/yq/libs/checkpointing/ut/ya.make +++ b/ydb/core/yq/libs/checkpointing/ut/ya.make @@ -1,20 +1,20 @@ UNITTEST_FOR(ydb/core/yq/libs/checkpointing) - + OWNER(g:yq) - -SRCS( - checkpoint_coordinator_ut.cpp -) - -PEERDIR( - library/cpp/testing/unittest + +SRCS( + checkpoint_coordinator_ut.cpp +) + +PEERDIR( + library/cpp/testing/unittest ydb/core/testlib/actors ydb/core/testlib/basics ydb/core/yq/libs/checkpointing -) - -SIZE(MEDIUM) - -YQL_LAST_ABI_VERSION() - +) + +SIZE(MEDIUM) + +YQL_LAST_ABI_VERSION() + END() diff --git a/ydb/core/yq/libs/checkpointing/utils.cpp b/ydb/core/yq/libs/checkpointing/utils.cpp index 22c6323214..63d427fa7c 100644 --- a/ydb/core/yq/libs/checkpointing/utils.cpp +++ b/ydb/core/yq/libs/checkpointing/utils.cpp @@ -1,5 +1,5 @@ -#include "utils.h" - +#include "utils.h" + namespace NYq { bool IsIngress(const NYql::NDqProto::TDqTask& task) { @@ -10,8 +10,8 @@ bool IsIngress(const NYql::NDqProto::TDqTask& task) { } } return true; -} - +} + bool IsEgress(const NYql::NDqProto::TDqTask& task) { for (const auto& output : task.GetOutputs()) { if (output.HasSink()) { @@ -19,11 +19,11 @@ bool IsEgress(const NYql::NDqProto::TDqTask& task) { } } return false; -} - +} + bool HasState(const NYql::NDqProto::TDqTask& task) { - Y_UNUSED(task); - return true; -} + Y_UNUSED(task); + return true; +} } // namespace NYq diff --git a/ydb/core/yq/libs/checkpointing/utils.h b/ydb/core/yq/libs/checkpointing/utils.h index d0540804c7..b664554d50 100644 --- a/ydb/core/yq/libs/checkpointing/utils.h +++ b/ydb/core/yq/libs/checkpointing/utils.h @@ -1,13 +1,13 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/dq/proto/dq_tasks.pb.h> - + namespace NYq { - -bool IsIngress(const NYql::NDqProto::TDqTask& task); - -bool IsEgress(const NYql::NDqProto::TDqTask& task); - -bool HasState(const NYql::NDqProto::TDqTask& task); - + +bool IsIngress(const NYql::NDqProto::TDqTask& task); + +bool IsEgress(const NYql::NDqProto::TDqTask& task); + +bool HasState(const NYql::NDqProto::TDqTask& task); + } // namespace NYq diff --git a/ydb/core/yq/libs/checkpointing/ya.make b/ydb/core/yq/libs/checkpointing/ya.make index 8a397f67ef..c16cc139ca 100644 --- a/ydb/core/yq/libs/checkpointing/ya.make +++ b/ydb/core/yq/libs/checkpointing/ya.make @@ -1,21 +1,21 @@ -OWNER( +OWNER( g:yq -) - -LIBRARY() - -SRCS( - checkpoint_coordinator.cpp - checkpoint_coordinator.h - checkpoint_id_generator.cpp - checkpoint_id_generator.h - pending_checkpoint.cpp - pending_checkpoint.h - utils.cpp - utils.h -) - -PEERDIR( +) + +LIBRARY() + +SRCS( + checkpoint_coordinator.cpp + checkpoint_coordinator.h + checkpoint_id_generator.cpp + checkpoint_id_generator.h + pending_checkpoint.cpp + pending_checkpoint.h + utils.cpp + utils.h +) + +PEERDIR( library/cpp/actors/core ydb/core/yq/libs/actors/logging ydb/core/yq/libs/checkpointing_common @@ -24,16 +24,16 @@ PEERDIR( ydb/library/yql/dq/proto ydb/library/yql/dq/state ydb/library/yql/providers/dq/api/protos -) - -YQL_LAST_ABI_VERSION() - -END() - -RECURSE( +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE( events ) RECURSE_FOR_TESTS( - ut -) + ut +) diff --git a/ydb/core/yq/libs/checkpointing_common/defs.cpp b/ydb/core/yq/libs/checkpointing_common/defs.cpp index e487b6542c..5a8e059657 100644 --- a/ydb/core/yq/libs/checkpointing_common/defs.cpp +++ b/ydb/core/yq/libs/checkpointing_common/defs.cpp @@ -14,12 +14,12 @@ TString TCoordinatorId::ToString() const { void TCoordinatorId::PrintTo(IOutputStream& out) const { - out << GraphId << "." << Generation; + out << GraphId << "." << Generation; } size_t TCheckpointIdHash::operator ()(const TCheckpointId& checkpointId) { - return MultiHash(checkpointId.CoordinatorGeneration, checkpointId.SeqNo); + return MultiHash(checkpointId.CoordinatorGeneration, checkpointId.SeqNo); } } // namespace NYq @@ -39,5 +39,5 @@ void Out<NYq::TCheckpointId>( IOutputStream& out, const NYq::TCheckpointId& checkpointId) { - out << checkpointId.CoordinatorGeneration << ":" << checkpointId.SeqNo; + out << checkpointId.CoordinatorGeneration << ":" << checkpointId.SeqNo; } diff --git a/ydb/core/yq/libs/checkpointing_common/defs.h b/ydb/core/yq/libs/checkpointing_common/defs.h index c57d1c4ff4..d67b50e62d 100644 --- a/ydb/core/yq/libs/checkpointing_common/defs.h +++ b/ydb/core/yq/libs/checkpointing_common/defs.h @@ -27,22 +27,22 @@ using TCoordinators = TVector<TCoordinatorId>; //////////////////////////////////////////////////////////////////////////////// struct TCheckpointId { - ui64 CoordinatorGeneration = 0; - ui64 SeqNo = 0; + ui64 CoordinatorGeneration = 0; + ui64 SeqNo = 0; - TCheckpointId(ui64 gen, ui64 SeqNo) - : CoordinatorGeneration(gen) - , SeqNo(SeqNo) + TCheckpointId(ui64 gen, ui64 SeqNo) + : CoordinatorGeneration(gen) + , SeqNo(SeqNo) { } bool operator ==(const TCheckpointId& rhs) const { - return CoordinatorGeneration == rhs.CoordinatorGeneration && SeqNo == rhs.SeqNo; + return CoordinatorGeneration == rhs.CoordinatorGeneration && SeqNo == rhs.SeqNo; } bool operator <(const TCheckpointId& rhs) const { - return CoordinatorGeneration < rhs.CoordinatorGeneration || - (CoordinatorGeneration == rhs.CoordinatorGeneration && SeqNo < rhs.SeqNo); + return CoordinatorGeneration < rhs.CoordinatorGeneration || + (CoordinatorGeneration == rhs.CoordinatorGeneration && SeqNo < rhs.SeqNo); } }; @@ -55,7 +55,7 @@ struct TCheckpointIdHash { //////////////////////////////////////////////////////////////////////////////// enum class ECheckpointStatus { - Unspecified, + Unspecified, Pending, PendingCommit, Completed, diff --git a/ydb/core/yq/libs/config/protos/yq_config.proto b/ydb/core/yq/libs/config/protos/yq_config.proto index 841b816821..f0a28cfcb0 100644 --- a/ydb/core/yq/libs/config/protos/yq_config.proto +++ b/ydb/core/yq/libs/config/protos/yq_config.proto @@ -29,7 +29,7 @@ message TConfig { TCommonConfig Common = 2; TControlPlaneStorageConfig ControlPlaneStorage = 3; TControlPlaneProxyConfig ControlPlaneProxy = 4; - NKikimrProto.NFolderService.TFolderServiceConfig FolderService = 5; + NKikimrProto.NFolderService.TFolderServiceConfig FolderService = 5; TPrivateApiConfig PrivateApi = 6; TTokenAccessorConfig TokenAccessor = 7; TDbPoolConfig DbPool = 8; diff --git a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp index 53838351b4..4146ba80f2 100644 --- a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp +++ b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp @@ -138,11 +138,11 @@ public: CPP_LOG_T("Request actor. Actor id: " << SelfId()); Become(&TRequestActor::StateFunc, GetDuration(Config.GetRequestTimeout(), TDuration::Seconds(30)), new NActors::TEvents::TEvWakeup()); if constexpr (ResolveFolder) { - auto request = std::make_unique<NKikimr::NFolderService::TEvFolderService::TEvGetFolderRequest>(); - request->Request.set_folder_id(FolderId); - request->Token = Token; + auto request = std::make_unique<NKikimr::NFolderService::TEvFolderService::TEvGetFolderRequest>(); + request->Request.set_folder_id(FolderId); + request->Token = Token; ResolveFolderCounters->InFly->Inc(); - Send(NKikimr::NFolderService::FolderServiceActorId(), request.release(), 0, 0); + Send(NKikimr::NFolderService::FolderServiceActorId(), request.release(), 0, 0); } else { Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, Permissions), 0, Cookie); } @@ -151,7 +151,7 @@ public: STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout); hFunc(TResponse, Handle); - hFunc(NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse, Handle); + hFunc(NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse, Handle); ) void HandleTimeout() { @@ -171,20 +171,20 @@ public: PassAway(); } - void Handle(NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse::TPtr& ev) { + void Handle(NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse::TPtr& ev) { ResolveFolderCounters->InFly->Dec(); ResolveFolderCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); - const auto& response = ev->Get()->Response; - TString errorMessage; + const auto& response = ev->Get()->Response; + TString errorMessage; - const auto& status = ev->Get()->Status; - if (!status.Ok() || !ev->Get()->Response.has_folder()) { + const auto& status = ev->Get()->Status; + if (!status.Ok() || !ev->Get()->Response.has_folder()) { ResolveFolderCounters->Error->Inc(); - errorMessage = "Msg: " + status.Msg + " Details: " + status.Details + " Code: " + ToString(status.GRpcStatusCode) + " InternalError: " + ToString(status.InternalError); - CPP_LOG_E(errorMessage); + errorMessage = "Msg: " + status.Msg + " Details: " + status.Details + " Code: " + ToString(status.GRpcStatusCode) + " InternalError: " + ToString(status.InternalError); + CPP_LOG_E(errorMessage); NYql::TIssues issues; - NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Resolve folder error"); + NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Resolve folder error"); issues.AddIssue(issue); Counters->Error->Inc(); Counters->Timeout->Inc(); @@ -196,7 +196,7 @@ public: } ResolveFolderCounters->Ok->Inc(); - CloudId = response.folder().cloud_id(); + CloudId = response.folder().cloud_id(); CPP_LOG_T("Cloud id: " << CloudId << " Folder id: " << FolderId); if constexpr (ResolveFolder) { Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions), 0, Cookie); diff --git a/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp b/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp index 3d0c5bb70b..5116f14855 100644 --- a/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp +++ b/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp @@ -18,7 +18,7 @@ #include <ydb/core/base/path.h> #include <ydb/library/folder_service/folder_service.h> -#include <ydb/library/folder_service/mock/mock_folder_service.h> +#include <ydb/library/folder_service/mock/mock_folder_service.h> #include <util/system/env.h> @@ -356,9 +356,9 @@ private: ControlPlaneProxyActorId(), TActorSetupCmd(controlPlaneProxy, TMailboxType::Simple, 0)); - auto folderService = NKikimr::NFolderService::CreateMockFolderServiceActor(NKikimrProto::NFolderService::TFolderServiceConfig{}); + auto folderService = NKikimr::NFolderService::CreateMockFolderServiceActor(NKikimrProto::NFolderService::TFolderServiceConfig{}); runtime->AddLocalService( - NKikimr::NFolderService::FolderServiceActorId(), + NKikimr::NFolderService::FolderServiceActorId(), TActorSetupCmd(folderService, TMailboxType::Simple, 0), 0 ); diff --git a/ydb/core/yq/libs/control_plane_proxy/ut/ya.make b/ydb/core/yq/libs/control_plane_proxy/ut/ya.make index 1550392ebc..0171764aa4 100644 --- a/ydb/core/yq/libs/control_plane_proxy/ut/ya.make +++ b/ydb/core/yq/libs/control_plane_proxy/ut/ya.make @@ -10,7 +10,7 @@ PEERDIR( ydb/core/yq/libs/control_plane_storage ydb/core/yq/libs/test_connection ydb/library/folder_service - ydb/library/folder_service/mock + ydb/library/folder_service/mock ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp b/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp index 8274a88589..ec3156870b 100644 --- a/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp +++ b/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp @@ -6,13 +6,13 @@ TRequestCounters::TRequestCounters(const TString& name) : Name(name) {} void TRequestCounters::Register(const NMonitoring::TDynamicCounterPtr& counters) { - auto requestCounters = counters->GetSubgroup("request", Name); - InFly = requestCounters->GetCounter("InFly", false); - Ok = requestCounters->GetCounter("Ok", true); - Error = requestCounters->GetCounter("Error", true); - Retry = requestCounters->GetCounter("Retry", true); - LatencyMs = requestCounters->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); - Issues = requestCounters->GetSubgroup("subcomponent", "Issues"); + auto requestCounters = counters->GetSubgroup("request", Name); + InFly = requestCounters->GetCounter("InFly", false); + Ok = requestCounters->GetCounter("Ok", true); + Error = requestCounters->GetCounter("Error", true); + Retry = requestCounters->GetCounter("Retry", true); + LatencyMs = requestCounters->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); + Issues = requestCounters->GetSubgroup("subcomponent", "Issues"); } NMonitoring::IHistogramCollectorPtr TRequestCounters::GetLatencyHistogramBuckets() { diff --git a/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.h b/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.h index 58d3c56b27..095656157d 100644 --- a/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.h +++ b/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.h @@ -14,7 +14,7 @@ public: NMonitoring::TDynamicCounters::TCounterPtr Error; NMonitoring::TDynamicCounters::TCounterPtr Retry; NMonitoring::THistogramPtr LatencyMs; - NMonitoring::TDynamicCounterPtr Issues; + NMonitoring::TDynamicCounterPtr Issues; explicit TRequestCounters(const TString& name); diff --git a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto index e6ebff0c84..2bc7aded3b 100644 --- a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto @@ -24,7 +24,7 @@ message QueryInternal { YandexQuery.QueryAction action = 7; string ast = 8; // deprected and should not be used, will be removed in future ExecuteMode execute_mode = 9; - StateLoadMode state_load_mode = 10; + StateLoadMode state_load_mode = 10; string cloud_id = 11; repeated Yq.Private.TopicConsumer created_topic_consumers = 12; repeated bytes dq_graph = 13; diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index 222ebb6e8e..75cb62bb8d 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -236,7 +236,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateNodesTable(TActorSystem* as) .AddNullableColumn(INTERCONNECT_PORT_COLUMN_NAME, EPrimitiveType::Uint32) .AddNullableColumn(NODE_ADDRESS_COLUMN_NAME, EPrimitiveType::String) .SetTtlSettings(EXPIRE_AT_COLUMN_NAME) - .SetPrimaryKeyColumns({TENANT_COLUMN_NAME, NODE_ID_COLUMN_NAME}) + .SetPrimaryKeyColumns({TENANT_COLUMN_NAME, NODE_ID_COLUMN_NAME}) .Build(); return YdbConnection->Client.RetryOperation( diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index 1c8cda7ee7..0387e3d8cb 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -637,12 +637,12 @@ private: event->DebugInfo = debugInfo; actorSystem->Send(new IEventHandle(ev->Sender, self, event.release(), 0, ev->Cookie)); requestCounters->Error->Inc(); - for (const auto& issue : issues) { - NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) { - Y_UNUSED(level); - requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc(); - }); - } + for (const auto& issue : issues) { + NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) { + Y_UNUSED(level); + requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc(); + }); + } } else { CPS_LOG_AS_T(*actorSystem, name << ": " << request.DebugString() << " success"); auto event = std::make_unique<ResponseEvent>(result); @@ -708,12 +708,12 @@ private: event->DebugInfo = debugInfo; actorSystem->Send(new IEventHandle(ev->Sender, self, event.release(), 0, ev->Cookie)); requestCounters->Error->Inc(); - for (const auto& issue : issues) { - NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) { - Y_UNUSED(level); - requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc(); - }); - } + for (const auto& issue : issues) { + NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) { + Y_UNUSED(level); + requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc(); + }); + } } else { CPS_LOG_AS_T(*actorSystem, name << ": " << request.DebugString() << " success"); auto event = std::make_unique<ResponseEvent>(result, auditDetails); @@ -774,12 +774,12 @@ private: event->DebugInfo = debugInfo; actorSystem->Send(new IEventHandle(ev->Sender, self, event.release(), 0, ev->Cookie)); requestCounters->Error->Inc(); - for (const auto& issue : issues) { - NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) { - Y_UNUSED(level); - requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc(); - }); - } + for (const auto& issue : issues) { + NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) { + Y_UNUSED(level); + requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc(); + }); + } } else { CPS_LOG_AS_T(*actorSystem, name << ": success"); auto event = std::unique_ptr<ResponseEvent>(new ResponseEvent(std::make_from_tuple<ResponseEvent>(result))); diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 398725f7f4..988c8c3963 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -193,7 +193,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery if (query.ByteSizeLong() > Config.Proto.GetMaxRequestSize()) { ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "Query data is not placed in the table. Please shorten your request"; } - + if (queryInternal.ByteSizeLong() > Config.Proto.GetMaxRequestSize()) { ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "Query internal data is not placed in the table. Please reduce the number of connections and bindings"; } diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index d66ac202bf..2f47844e4c 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -102,7 +102,7 @@ void Init( const TAppData* appData, const TString& tenant, ::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections, - const IYqSharedResources::TPtr& iyqSharedResources, + const IYqSharedResources::TPtr& iyqSharedResources, const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory, const std::function<IActor*(const NYq::NConfig::TAuditConfig& auditConfig)>& auditServiceFactory, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, @@ -145,8 +145,8 @@ void Init( // if not enabled then stub { - auto folderService = folderServiceFactory(protoConfig.GetFolderService()); - actorRegistrator(NKikimr::NFolderService::FolderServiceActorId(), folderService); + auto folderService = folderServiceFactory(protoConfig.GetFolderService()); + actorRegistrator(NKikimr::NFolderService::FolderServiceActorId(), folderService); } if (protoConfig.GetCheckpointCoordinator().GetEnabled()) { diff --git a/ydb/core/yq/libs/init/init.h b/ydb/core/yq/libs/init/init.h index fa26651f18..d44711f959 100644 --- a/ydb/core/yq/libs/init/init.h +++ b/ydb/core/yq/libs/init/init.h @@ -10,7 +10,7 @@ #include <ydb/library/folder_service/proto/config.pb.h> #include <ydb/core/yq/libs/config/protos/audit.pb.h> - + #include <ydb/library/yql/providers/pq/cm_client/interface/client.h> #include <library/cpp/actors/core/actor.h> @@ -33,11 +33,11 @@ void Init( const NKikimr::TAppData* appData, const TString& tenant, ::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections, - const IYqSharedResources::TPtr& yqSharedResources, + const IYqSharedResources::TPtr& yqSharedResources, const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory, const std::function<IActor*(const NYq::NConfig::TAuditConfig& auditConfig)>& auditServiceFactory, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const ui32& icPort -); +); } // NYq diff --git a/ydb/core/yq/libs/ya.make b/ydb/core/yq/libs/ya.make index ccc88a55b6..1b1cd0eed6 100644 --- a/ydb/core/yq/libs/ya.make +++ b/ydb/core/yq/libs/ya.make @@ -4,8 +4,8 @@ RECURSE( actors audit checkpoint_storage - checkpointing - checkpointing_common + checkpointing + checkpointing_common common config control_plane_proxy diff --git a/ydb/library/folder_service/events.h b/ydb/library/folder_service/events.h index 71329b1d4f..53255a11c8 100644 --- a/ydb/library/folder_service/events.h +++ b/ydb/library/folder_service/events.h @@ -1,34 +1,34 @@ -#pragma once - +#pragma once + #include <ydb/library/folder_service/proto/folder_service.pb.h> #include <ydb/core/base/events.h> - -#include <library/cpp/grpc/client/grpc_client_low.h> - -namespace NKikimr::NFolderService { - -struct TEvFolderService { - enum EEv { - // requests - EvGetFolderRequest = EventSpaceBegin(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER), - - // replies - EvGetFolderResponse = EventSpaceBegin(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER) + 512, - - EvEnd - }; - - static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER)"); - - struct TEvGetFolderRequest : TEventLocal<TEvGetFolderRequest, EvGetFolderRequest> { - NKikimrProto::NFolderService::GetFolderRequest Request; - TString Token; - TString RequestId; - }; - - struct TEvGetFolderResponse : TEventLocal<TEvGetFolderResponse, EvGetFolderResponse> { - NKikimrProto::NFolderService::GetFolderResponse Response; - NGrpc::TGrpcStatus Status; - }; -}; -} // namespace NKikimr::NFolderService + +#include <library/cpp/grpc/client/grpc_client_low.h> + +namespace NKikimr::NFolderService { + +struct TEvFolderService { + enum EEv { + // requests + EvGetFolderRequest = EventSpaceBegin(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER), + + // replies + EvGetFolderResponse = EventSpaceBegin(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER) + 512, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER)"); + + struct TEvGetFolderRequest : TEventLocal<TEvGetFolderRequest, EvGetFolderRequest> { + NKikimrProto::NFolderService::GetFolderRequest Request; + TString Token; + TString RequestId; + }; + + struct TEvGetFolderResponse : TEventLocal<TEvGetFolderResponse, EvGetFolderResponse> { + NKikimrProto::NFolderService::GetFolderResponse Response; + NGrpc::TGrpcStatus Status; + }; +}; +} // namespace NKikimr::NFolderService diff --git a/ydb/library/folder_service/folder_service.cpp b/ydb/library/folder_service/folder_service.cpp index 1189c4005c..b9d770d1fa 100644 --- a/ydb/library/folder_service/folder_service.cpp +++ b/ydb/library/folder_service/folder_service.cpp @@ -1,10 +1,10 @@ -#include "folder_service.h" - -namespace NKikimr::NFolderService { - -NActors::TActorId FolderServiceActorId() { - constexpr TStringBuf name = "FLDRSRVS"; - return NActors::TActorId(0, name); -} - -} // namespace NKikimr::NFolderService
\ No newline at end of file +#include "folder_service.h" + +namespace NKikimr::NFolderService { + +NActors::TActorId FolderServiceActorId() { + constexpr TStringBuf name = "FLDRSRVS"; + return NActors::TActorId(0, name); +} + +} // namespace NKikimr::NFolderService
\ No newline at end of file diff --git a/ydb/library/folder_service/folder_service.h b/ydb/library/folder_service/folder_service.h index 142118c3c9..d1d4cf0305 100644 --- a/ydb/library/folder_service/folder_service.h +++ b/ydb/library/folder_service/folder_service.h @@ -1,12 +1,12 @@ -#pragma once - +#pragma once + #include <ydb/library/folder_service/proto/config.pb.h> #include <library/cpp/actors/core/actor.h> -namespace NKikimr::NFolderService { +namespace NKikimr::NFolderService { -NActors::TActorId FolderServiceActorId(); +NActors::TActorId FolderServiceActorId(); -NActors::IActor* CreateFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig& config); +NActors::IActor* CreateFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig& config); -} // namespace NKikimr::NFolderService +} // namespace NKikimr::NFolderService diff --git a/ydb/library/folder_service/mock/mock_folder_service.cpp b/ydb/library/folder_service/mock/mock_folder_service.cpp index 68f6d1d64d..c08ba6f5d9 100644 --- a/ydb/library/folder_service/mock/mock_folder_service.cpp +++ b/ydb/library/folder_service/mock/mock_folder_service.cpp @@ -1,40 +1,40 @@ #include <ydb/library/folder_service/mock/mock_folder_service.h> #include <ydb/library/folder_service/events.h> -#include <library/cpp/actors/core/hfunc.h> - -namespace NKikimr::NFolderService { - -class TFolderServiceMock - : public NActors::TActor<TFolderServiceMock> { - using TThis = TFolderServiceMock; - using TBase = NActors::TActor<TFolderServiceMock>; - - using TEvListFolderRequest = NKikimr::NFolderService::TEvFolderService::TEvGetFolderRequest; - using TEvListFolderResponse = NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse; - -public: - TFolderServiceMock() - : TBase(&TThis::StateWork) { - } - - void Handle(TEvListFolderRequest::TPtr& ev) { - auto result = std::make_unique<TEvListFolderResponse>(); - auto* fakeFolder = result->Response.mutable_folder(); - fakeFolder->set_id(ev.Get()->Get()->Request.folder_id()); - fakeFolder->set_cloud_id("mock_cloud"); - result->Status = NGrpc::TGrpcStatus(); - Send(ev->Sender, result.release()); - } - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvListFolderRequest, Handle) - cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway) - } - } -}; - -NActors::IActor* CreateMockFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig&) { - return new TFolderServiceMock(); -} -} // namespace NKikimr::NFolderService +#include <library/cpp/actors/core/hfunc.h> + +namespace NKikimr::NFolderService { + +class TFolderServiceMock + : public NActors::TActor<TFolderServiceMock> { + using TThis = TFolderServiceMock; + using TBase = NActors::TActor<TFolderServiceMock>; + + using TEvListFolderRequest = NKikimr::NFolderService::TEvFolderService::TEvGetFolderRequest; + using TEvListFolderResponse = NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse; + +public: + TFolderServiceMock() + : TBase(&TThis::StateWork) { + } + + void Handle(TEvListFolderRequest::TPtr& ev) { + auto result = std::make_unique<TEvListFolderResponse>(); + auto* fakeFolder = result->Response.mutable_folder(); + fakeFolder->set_id(ev.Get()->Get()->Request.folder_id()); + fakeFolder->set_cloud_id("mock_cloud"); + result->Status = NGrpc::TGrpcStatus(); + Send(ev->Sender, result.release()); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvListFolderRequest, Handle) + cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway) + } + } +}; + +NActors::IActor* CreateMockFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig&) { + return new TFolderServiceMock(); +} +} // namespace NKikimr::NFolderService diff --git a/ydb/library/folder_service/mock/mock_folder_service.h b/ydb/library/folder_service/mock/mock_folder_service.h index 301ad2f551..9fc55ad95c 100644 --- a/ydb/library/folder_service/mock/mock_folder_service.h +++ b/ydb/library/folder_service/mock/mock_folder_service.h @@ -1,10 +1,10 @@ -#pragma once - +#pragma once + #include <ydb/library/folder_service/proto/config.pb.h> - -#include <library/cpp/actors/core/actor.h> - -namespace NKikimr::NFolderService { - -NActors::IActor* CreateMockFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig& config); -} + +#include <library/cpp/actors/core/actor.h> + +namespace NKikimr::NFolderService { + +NActors::IActor* CreateMockFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig& config); +} diff --git a/ydb/library/folder_service/mock/ya.make b/ydb/library/folder_service/mock/ya.make index b502c4306a..bd42486475 100644 --- a/ydb/library/folder_service/mock/ya.make +++ b/ydb/library/folder_service/mock/ya.make @@ -1,17 +1,17 @@ OWNER(g:yq) - -LIBRARY() - -SRCS( - mock_folder_service.cpp -) - -PEERDIR( - library/cpp/actors/core + +LIBRARY() + +SRCS( + mock_folder_service.cpp +) + +PEERDIR( + library/cpp/actors/core ydb/library/folder_service ydb/library/folder_service/proto -) - -YQL_LAST_ABI_VERSION() - -END() +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/folder_service/proto/config.proto b/ydb/library/folder_service/proto/config.proto index 06981810a3..94fadb0f41 100644 --- a/ydb/library/folder_service/proto/config.proto +++ b/ydb/library/folder_service/proto/config.proto @@ -1,9 +1,9 @@ -syntax = "proto3"; - -package NKikimrProto.NFolderService; - -message TFolderServiceConfig { - bool Enable = 1; - string Endpoint = 2; - string PathToRootCA = 3; -} +syntax = "proto3"; + +package NKikimrProto.NFolderService; + +message TFolderServiceConfig { + bool Enable = 1; + string Endpoint = 2; + string PathToRootCA = 3; +} diff --git a/ydb/library/folder_service/proto/folder_service.proto b/ydb/library/folder_service/proto/folder_service.proto index bb4fb4c2a6..39f77f34ca 100644 --- a/ydb/library/folder_service/proto/folder_service.proto +++ b/ydb/library/folder_service/proto/folder_service.proto @@ -1,36 +1,36 @@ -syntax = "proto3"; - -import "google/protobuf/timestamp.proto"; - -package NKikimrProto.NFolderService; - -// copied from cloud/bitbucket/private-api/yandex/cloud/priv/resourcemanager/v1/folder_service.proto - -message Folder { - - enum Status { - STATUS_UNSPECIFIED = 0; - ACTIVE = 1; - DELETING = 2; - DELETED = 3; - PENDING_DELETION = 4; - } - - string id = 1; - string cloud_id = 2; - - google.protobuf.Timestamp created_at = 3; - string name = 4; - string description = 5; - map<string, string> labels = 6; - - Status status = 7; -} - -message GetFolderRequest { - string folder_id = 1; -} - -message GetFolderResponse { - Folder folder = 1; -} +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +package NKikimrProto.NFolderService; + +// copied from cloud/bitbucket/private-api/yandex/cloud/priv/resourcemanager/v1/folder_service.proto + +message Folder { + + enum Status { + STATUS_UNSPECIFIED = 0; + ACTIVE = 1; + DELETING = 2; + DELETED = 3; + PENDING_DELETION = 4; + } + + string id = 1; + string cloud_id = 2; + + google.protobuf.Timestamp created_at = 3; + string name = 4; + string description = 5; + map<string, string> labels = 6; + + Status status = 7; +} + +message GetFolderRequest { + string folder_id = 1; +} + +message GetFolderResponse { + Folder folder = 1; +} diff --git a/ydb/library/folder_service/proto/ya.make b/ydb/library/folder_service/proto/ya.make index 1b1c5dc6bb..154c56f8a7 100644 --- a/ydb/library/folder_service/proto/ya.make +++ b/ydb/library/folder_service/proto/ya.make @@ -1,12 +1,12 @@ OWNER(g:yq) - -PROTO_LIBRARY() - -SRCS( - config.proto - folder_service.proto -) - -EXCLUDE_TAGS(GO_PROTO) - -END() + +PROTO_LIBRARY() + +SRCS( + config.proto + folder_service.proto +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/ydb/library/folder_service/ya.make b/ydb/library/folder_service/ya.make index 5ca60e0271..897e56bf49 100644 --- a/ydb/library/folder_service/ya.make +++ b/ydb/library/folder_service/ya.make @@ -1,22 +1,22 @@ OWNER(g:yq) - -LIBRARY() - -SRCS( + +LIBRARY() + +SRCS( events.h - folder_service.cpp + folder_service.cpp folder_service.h -) - -PEERDIR( - library/cpp/actors/core +) + +PEERDIR( + library/cpp/actors/core ydb/core/base ydb/library/folder_service/proto -) - -END() - -RECURSE( - mock - proto -) +) + +END() + +RECURSE( + mock + proto +) diff --git a/ydb/library/yql/dq/actors/compute/dq_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_checkpoints.h index e6775602a4..de7c2cea33 100644 --- a/ydb/library/yql/dq/actors/compute/dq_checkpoints.h +++ b/ydb/library/yql/dq/actors/compute/dq_checkpoints.h @@ -1,14 +1,14 @@ -#pragma once - +#pragma once + #include <library/cpp/actors/core/actorid.h> namespace NYql { namespace NDq { -inline static NActors::TActorId MakeCheckpointStorageID() { - const char name[12] = "cp_storage"; - return NActors::TActorId(0, TStringBuf(name, 12)); -} +inline static NActors::TActorId MakeCheckpointStorageID() { + const char name[12] = "cp_storage"; + return NActors::TActorId(0, TStringBuf(name, 12)); +} } // namespace NDq } // namespace NYql diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 84eeda2edb..3eaf2d1246 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -54,20 +54,20 @@ struct TEvDqCompute { const ui64 FromSeqNo; const ui64 ToSeqNo; }; - + struct TEvRun : public NActors::TEventPB<TEvRun, NDqProto::TEvRun, TDqComputeEvents::EvRun> {}; - + struct TEvNewCheckpointCoordinator : public NActors::TEventPB<TEvNewCheckpointCoordinator, - NDqProto::TEvNewCheckpointCoordinator, TDqComputeEvents::EvNewCheckpointCoordinator> { - - TEvNewCheckpointCoordinator() = default; - - TEvNewCheckpointCoordinator(ui64 generation, TString graphId) { - Record.SetGeneration(generation); - Record.SetGraphId(std::move(graphId)); - } - }; - + NDqProto::TEvNewCheckpointCoordinator, TDqComputeEvents::EvNewCheckpointCoordinator> { + + TEvNewCheckpointCoordinator() = default; + + TEvNewCheckpointCoordinator(ui64 generation, TString graphId) { + Record.SetGeneration(generation); + Record.SetGraphId(std::move(graphId)); + } + }; + struct TEvNewCheckpointCoordinatorAck : public NActors::TEventPB<TEvNewCheckpointCoordinatorAck, NDqProto::TEvNewCheckpointCoordinatorAck, TDqComputeEvents::EvNewCheckpointCoordinatorAck> { @@ -75,63 +75,63 @@ struct TEvDqCompute { }; struct TEvInjectCheckpoint : public NActors::TEventPB<TEvInjectCheckpoint, - NDqProto::TEvInjectCheckpoint, TDqComputeEvents::EvInjectCheckpoint> { - - TEvInjectCheckpoint() = default; - - TEvInjectCheckpoint(ui64 id, ui64 generation) { - Record.MutableCheckpoint()->SetId(id); - Record.MutableCheckpoint()->SetGeneration(generation); - Record.SetGeneration(generation); - } - }; - + NDqProto::TEvInjectCheckpoint, TDqComputeEvents::EvInjectCheckpoint> { + + TEvInjectCheckpoint() = default; + + TEvInjectCheckpoint(ui64 id, ui64 generation) { + Record.MutableCheckpoint()->SetId(id); + Record.MutableCheckpoint()->SetGeneration(generation); + Record.SetGeneration(generation); + } + }; + struct TEvSaveTaskState : public NActors::TEventLocal<TEvSaveTaskState, TDqComputeEvents::EvSaveTaskState> { TEvSaveTaskState(TString graphId, ui64 taskId, NDqProto::TCheckpoint checkpoint) - : GraphId(std::move(graphId)) - , TaskId(taskId) - , Checkpoint(std::move(checkpoint)) + : GraphId(std::move(graphId)) + , TaskId(taskId) + , Checkpoint(std::move(checkpoint)) {} - - const TString GraphId; - const ui64 TaskId; + + const TString GraphId; + const ui64 TaskId; const NDqProto::TCheckpoint Checkpoint; NDqProto::TComputeActorState State; - }; - + }; + struct TEvSaveTaskStateResult : public NActors::TEventPB<TEvSaveTaskStateResult, NDqProto::TEvSaveTaskStateResult, TDqComputeEvents::EvSaveTaskStateResult> {}; - + struct TEvCommitState : public NActors::TEventPB<TEvCommitState, - NDqProto::TEvCommitState, TDqComputeEvents::EvCommitState> { - - TEvCommitState() = default; - - TEvCommitState(ui64 checkpointId, ui64 checkpointGeneration, ui64 coordinatorGeneration) { - Record.MutableCheckpoint()->SetId(checkpointId); - Record.MutableCheckpoint()->SetGeneration(checkpointGeneration); - Record.SetGeneration(coordinatorGeneration); - } - }; - + NDqProto::TEvCommitState, TDqComputeEvents::EvCommitState> { + + TEvCommitState() = default; + + TEvCommitState(ui64 checkpointId, ui64 checkpointGeneration, ui64 coordinatorGeneration) { + Record.MutableCheckpoint()->SetId(checkpointId); + Record.MutableCheckpoint()->SetGeneration(checkpointGeneration); + Record.SetGeneration(coordinatorGeneration); + } + }; + struct TEvStateCommitted : public NActors::TEventPB<TEvStateCommitted, - NDqProto::TEvStateCommitted, TDqComputeEvents::EvStateCommitted> { - - TEvStateCommitted() = default; - - TEvStateCommitted(ui64 id, ui64 generation, ui64 taskId) { - Record.MutableCheckpoint()->SetId(id); - Record.MutableCheckpoint()->SetGeneration(generation); - Record.SetTaskId(taskId); - } - }; - + NDqProto::TEvStateCommitted, TDqComputeEvents::EvStateCommitted> { + + TEvStateCommitted() = default; + + TEvStateCommitted(ui64 id, ui64 generation, ui64 taskId) { + Record.MutableCheckpoint()->SetId(id); + Record.MutableCheckpoint()->SetGeneration(generation); + Record.SetTaskId(taskId); + } + }; + struct TEvRestoreFromCheckpoint : public NActors::TEventPB<TEvRestoreFromCheckpoint, - NDqProto::TEvRestoreFromCheckpoint, TDqComputeEvents::EvRestoreFromCheckpoint> { - - TEvRestoreFromCheckpoint() = default; - - TEvRestoreFromCheckpoint(ui64 checkpointId, ui64 checkpointGeneration, ui64 coordinatorGeneration) { + NDqProto::TEvRestoreFromCheckpoint, TDqComputeEvents::EvRestoreFromCheckpoint> { + + TEvRestoreFromCheckpoint() = default; + + TEvRestoreFromCheckpoint(ui64 checkpointId, ui64 checkpointGeneration, ui64 coordinatorGeneration) { Init(checkpointId, checkpointGeneration, coordinatorGeneration); Record.MutableStateLoadPlan()->SetStateType(NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN); // default } @@ -143,16 +143,16 @@ struct TEvDqCompute { private: void Init(ui64 checkpointId, ui64 checkpointGeneration, ui64 coordinatorGeneration) { - Record.MutableCheckpoint()->SetId(checkpointId); - Record.MutableCheckpoint()->SetGeneration(checkpointGeneration); - Record.SetGeneration(coordinatorGeneration); - } - }; - + Record.MutableCheckpoint()->SetId(checkpointId); + Record.MutableCheckpoint()->SetGeneration(checkpointGeneration); + Record.SetGeneration(coordinatorGeneration); + } + }; + struct TEvRestoreFromCheckpointResult : public NActors::TEventPB<TEvRestoreFromCheckpointResult, NDqProto::TEvRestoreFromCheckpointResult, TDqComputeEvents::EvRestoreFromCheckpointResult> { using TBaseEventPB = NActors::TEventPB<TEvRestoreFromCheckpointResult, NDqProto::TEvRestoreFromCheckpointResult, TDqComputeEvents::EvRestoreFromCheckpointResult>; - + using TBaseEventPB::TBaseEventPB; TEvRestoreFromCheckpointResult(const NDqProto::TCheckpoint& checkpoint, ui64 taskId, NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status) { @@ -164,28 +164,28 @@ struct TEvDqCompute { struct TEvGetTaskState : public NActors::TEventLocal<TEvGetTaskState, TDqComputeEvents::EvGetTaskState> { TEvGetTaskState(TString graphId, const std::vector<ui64>& taskIds, NDqProto::TCheckpoint checkpoint, ui64 generation) - : GraphId(std::move(graphId)) + : GraphId(std::move(graphId)) , TaskIds(taskIds) - , Checkpoint(std::move(checkpoint)) + , Checkpoint(std::move(checkpoint)) , Generation(generation) {} - - const TString GraphId; + + const TString GraphId; const std::vector<ui64> TaskIds; const NDqProto::TCheckpoint Checkpoint; - const ui64 Generation; - }; - + const ui64 Generation; + }; + struct TEvGetTaskStateResult : public NActors::TEventLocal<TEvGetTaskStateResult, TDqComputeEvents::EvGetTaskStateResult> { TEvGetTaskStateResult(NDqProto::TCheckpoint checkpoint, TIssues issues, ui64 generation) - : Checkpoint(std::move(checkpoint)) - , Issues(std::move(issues)) + : Checkpoint(std::move(checkpoint)) + , Issues(std::move(issues)) , Generation(generation) {} - + const NDqProto::TCheckpoint Checkpoint; std::vector<NDqProto::TComputeActorState> States; const TIssues Issues; - const ui64 Generation; - }; + const ui64 Generation; + }; }; struct TDqExecutionSettings { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp index 9d943cbca3..c5368313b8 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp @@ -1,37 +1,37 @@ -#include "dq_compute_actor_checkpoints.h" -#include "dq_checkpoints.h" +#include "dq_compute_actor_checkpoints.h" +#include "dq_checkpoints.h" #include "dq_compute_actor_impl.h" - + #include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h> #include <algorithm> -#define LOG_D(s) \ +#define LOG_D(s) \ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s) -#define LOG_I(s) \ +#define LOG_I(s) \ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s) -#define LOG_W(s) \ +#define LOG_W(s) \ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s) -#define LOG_E(s) \ +#define LOG_E(s) \ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s) - -#define LOG_CP_D(s) \ + +#define LOG_CP_D(s) \ LOG_D("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) -#define LOG_CP_I(s) \ +#define LOG_CP_I(s) \ LOG_I("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) -#define LOG_CP_E(s) \ +#define LOG_CP_E(s) \ LOG_E("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) - + namespace NYql::NDq { - + using namespace NActors; namespace { TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) { return TStringBuilder() << checkpoint.GetGeneration() << "." << checkpoint.GetId(); -} - +} + bool IsIngressTask(const NDqProto::TDqTask& task) { for (const auto& input : task.GetInputs()) { if (!input.HasSource()) { @@ -106,16 +106,16 @@ NDqProto::TComputeActorState CombineForeignState( } // namespace TDqComputeActorCheckpoints::TDqComputeActorCheckpoints(const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor) - : TActor(&TDqComputeActorCheckpoints::StateFunc) + : TActor(&TDqComputeActorCheckpoints::StateFunc) , TxId(txId) - , Task(std::move(task)) + , Task(std::move(task)) , IngressTask(IsIngressTask(Task)) - , CheckpointStorage(MakeCheckpointStorageID()) + , CheckpointStorage(MakeCheckpointStorageID()) , ComputeActor(computeActor) , PendingCheckpoint(Task) { -} - +} + void TDqComputeActorCheckpoints::Init(NActors::TActorId computeActorId, NActors::TActorId checkpointsId) { EventsQueue.Init(TxId, computeActorId, checkpointsId); } @@ -133,28 +133,28 @@ STRICT_STFUNC(TDqComputeActorCheckpoints::StateFunc, hFunc(TEvRetryQueuePrivate::TEvRetry, Handle); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); ) - + namespace { // Get generation for protobuf event. template <class E> auto GetGeneration(const E& ev) -> decltype(ev->Get()->Record.GetGeneration()) { return ev->Get()->Record.GetGeneration(); -} - +} + // Get generation for local event. -template <class E> +template <class E> auto GetGeneration(const E& ev) -> decltype(ev->Get()->Generation) { return ev->Get()->Generation; } - + ui64 GetGeneration(const TEvDqCompute::TEvSaveTaskStateResult::TPtr& ev) { return ev->Get()->Record.GetCheckpoint().GetGeneration(); -} - +} + } // anonymous namespace -template <class E> +template <class E> bool TDqComputeActorCheckpoints::ShouldIgnoreOldCoordinator(const E& ev, bool verifyOnGenerationFromFuture) { const ui64 generation = GetGeneration(ev); Y_VERIFY(!verifyOnGenerationFromFuture || !CheckpointCoordinator || generation <= CheckpointCoordinator->Generation, @@ -165,72 +165,72 @@ bool TDqComputeActorCheckpoints::ShouldIgnoreOldCoordinator(const E& ev, bool ve return true; } return false; -} - +} + void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinator::TPtr& ev) { if (ShouldIgnoreOldCoordinator(ev, false)) { return; } const ui64 newGeneration = ev->Get()->Record.GetGeneration(); - LOG_I("Got TEvNewCheckpointCoordinator event: generation " << newGeneration << ", actorId: " << ev->Sender); - + LOG_I("Got TEvNewCheckpointCoordinator event: generation " << newGeneration << ", actorId: " << ev->Sender); + if (CheckpointCoordinator && CheckpointCoordinator->Generation == newGeneration) { // The same message. It was retry from coordinator. - Y_VERIFY(CheckpointCoordinator->ActorId == ev->Sender, "there shouldn't be two different checkpoint coordinators with the same generation"); + Y_VERIFY(CheckpointCoordinator->ActorId == ev->Sender, "there shouldn't be two different checkpoint coordinators with the same generation"); Y_VERIFY(GraphId == ev->Get()->Record.GetGraphId()); - return; - } - + return; + } + if (CheckpointCoordinator) { LOG_I("Replace stale checkpoint coordinator (generation = " << CheckpointCoordinator->Generation << ") with a new one"); } else { LOG_I("Assign checkpoint coordinator (generation = " << newGeneration << ")"); } - CheckpointCoordinator = TCheckpointCoordinatorId(ev->Sender, newGeneration); + CheckpointCoordinator = TCheckpointCoordinatorId(ev->Sender, newGeneration); GraphId = ev->Get()->Record.GetGraphId(); - + EventsQueue.OnNewRecipientId(ev->Sender); Y_VERIFY(EventsQueue.OnEventReceived(ev->Get())); EventsQueue.Send(new TEvDqCompute::TEvNewCheckpointCoordinatorAck()); - if (PendingCheckpoint) { - LOG_I("Drop pending checkpoint since coordinator is stale"); - PendingCheckpoint.Clear(); - ComputeActor->ResumeInputs(); - } -} - + if (PendingCheckpoint) { + LOG_I("Drop pending checkpoint since coordinator is stale"); + PendingCheckpoint.Clear(); + ComputeActor->ResumeInputs(); + } +} + void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvInjectCheckpoint::TPtr& ev) { if (ShouldIgnoreOldCoordinator(ev)) { return; } YQL_ENSURE(IngressTask, "Shouldn't inject barriers into non-ingress tasks"); - YQL_ENSURE(!PendingCheckpoint); - - PendingCheckpoint = ev->Get()->Record.GetCheckpoint(); - LOG_CP_I("Got TEvInjectCheckpoint"); - ComputeActor->ResumeExecution(); -} - + YQL_ENSURE(!PendingCheckpoint); + + PendingCheckpoint = ev->Get()->Record.GetCheckpoint(); + LOG_CP_I("Got TEvInjectCheckpoint"); + ComputeActor->ResumeExecution(); +} + void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvSaveTaskStateResult::TPtr& ev) { if (ShouldIgnoreOldCoordinator(ev)) { - return; - } + return; + } EventsQueue.Send(ev->Release().Release(), ev->Cookie); -} - +} + void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::TPtr& ev) { if (ShouldIgnoreOldCoordinator(ev)) { - return; - } - + return; + } + if (!EventsQueue.OnEventReceived(ev)) { return; } - ComputeActor->Stop(); + ComputeActor->Stop(); TaskLoadPlan = ev->Get()->Record.GetStateLoadPlan(); const auto& checkpoint = ev->Get()->Record.GetCheckpoint(); LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Got TEvRestoreFromCheckpoint event with plan " << TaskLoadPlan); @@ -271,38 +271,38 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint:: break; } } -} - +} + void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPtr& ev) { if (ShouldIgnoreOldCoordinator(ev)) { - return; - } - - auto& checkpoint = ev->Get()->Checkpoint; + return; + } + + auto& checkpoint = ev->Get()->Checkpoint; std::vector<ui64> taskIds; size_t taskIdsSize = 1; if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) { taskIds = TaskIdsFromLoadPlan(TaskLoadPlan); taskIdsSize = taskIds.size(); } - + if (!ev->Get()->Issues.Empty()) { LOG_E("[Checkpoint " << MakeStringForLog(checkpoint) << "] Can't get state from storage: " << ev->Get()->Issues.ToString()); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie); return; } - + if (ev->Get()->States.size() != taskIdsSize) { LOG_E("[Checkpoint " << MakeStringForLog(checkpoint) << "] Got unexpected states count. States count: " << ev->Get()->States.size() << ". Expected states count: " << taskIdsSize); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie); - return; - } - + return; + } + LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Got TEvGetTaskStateResult event, restoring state"); - try { + try { if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN) { ComputeActor->LoadState(ev->Get()->States[0]); } else if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) { @@ -313,41 +313,41 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt NDqProto::NDqStateLoadPlan::EStateType_Name(TaskLoadPlan.GetStateType()).c_str(), static_cast<int>(TaskLoadPlan.GetStateType())); } - } catch (const std::exception& e) { + } catch (const std::exception& e) { LOG_E("[Checkpoint " << MakeStringForLog(checkpoint) << "] Failed to load state: " << e.what()); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), ev->Cookie); LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Checkpoint state restoration aborted"); - return; - } - + return; + } + EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), ev->Cookie); LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Checkpoint state restored"); -} - +} + void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRun::TPtr& ev) { EventsQueue.OnEventReceived(ev); } void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvCommitState::TPtr& ev) { if (ShouldIgnoreOldCoordinator(ev)) { - return; - } - + return; + } + if (!EventsQueue.OnEventReceived(ev)) { return; } - // No actual commit at the moment: will be done in further commits - auto checkpoint = ev->Get()->Record.GetCheckpoint(); + // No actual commit at the moment: will be done in further commits + auto checkpoint = ev->Get()->Record.GetCheckpoint(); ComputeActor->CommitState(checkpoint); EventsQueue.Send(new TEvDqCompute::TEvStateCommitted(checkpoint.GetId(), checkpoint.GetGeneration(), Task.GetId()), ev->Cookie); -} - -void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvPoison::TPtr&) { - LOG_D("pass away"); - PassAway(); -} - +} + +void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvPoison::TPtr&) { + LOG_D("pass away"); + PassAway(); +} + void TDqComputeActorCheckpoints::Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { LOG_I("Handle disconnected node " << ev->Get()->NodeId); EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId); @@ -363,62 +363,62 @@ void TDqComputeActorCheckpoints::Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev EventsQueue.Retry(); } -bool TDqComputeActorCheckpoints::HasPendingCheckpoint() const { +bool TDqComputeActorCheckpoints::HasPendingCheckpoint() const { return PendingCheckpoint; -} - +} + bool TDqComputeActorCheckpoints::ComputeActorStateSaved() const { return PendingCheckpoint && PendingCheckpoint.SavedComputeActorState; } -void TDqComputeActorCheckpoints::DoCheckpoint() { - Y_VERIFY(CheckpointCoordinator); +void TDqComputeActorCheckpoints::DoCheckpoint() { + Y_VERIFY(CheckpointCoordinator); Y_VERIFY(PendingCheckpoint); - - LOG_CP_I("Performing task checkpoint"); - if (SaveState()) { - LOG_CP_D("Injecting checkpoint barrier to outputs"); + + LOG_CP_I("Performing task checkpoint"); + if (SaveState()) { + LOG_CP_D("Injecting checkpoint barrier to outputs"); ComputeActor->InjectBarrierToOutputs(*PendingCheckpoint.Checkpoint); TryToSavePendingCheckpoint(); - } -} - -[[nodiscard]] -bool TDqComputeActorCheckpoints::SaveState() { - LOG_CP_D("Saving task state"); - - try { + } +} + +[[nodiscard]] +bool TDqComputeActorCheckpoints::SaveState() { + LOG_CP_D("Saving task state"); + + try { Y_VERIFY(!PendingCheckpoint.SavedComputeActorState); PendingCheckpoint.SavedComputeActorState = true; ComputeActor->SaveState(*PendingCheckpoint.Checkpoint, PendingCheckpoint.ComputeActorState); - } catch (const std::exception& e) { + } catch (const std::exception& e) { PendingCheckpoint.Clear(); - LOG_CP_E("Failed to save state: " << e.what()); - + LOG_CP_E("Failed to save state: " << e.what()); + auto resultEv = MakeHolder<TEvDqCompute::TEvSaveTaskStateResult>(); resultEv->Record.MutableCheckpoint()->CopyFrom(*PendingCheckpoint.Checkpoint); - resultEv->Record.SetTaskId(Task.GetId()); + resultEv->Record.SetTaskId(Task.GetId()); resultEv->Record.SetStatus(NDqProto::TEvSaveTaskStateResult::INTERNAL_ERROR); EventsQueue.Send(std::move(resultEv)); - - return false; - } - + + return false; + } + LOG_CP_D("Compute actor state saved"); - return true; -} - + return true; +} + void TDqComputeActorCheckpoints::RegisterCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId) { - if (!PendingCheckpoint) { - PendingCheckpoint = checkpoint; - } else { + if (!PendingCheckpoint) { + PendingCheckpoint = checkpoint; + } else { YQL_ENSURE(PendingCheckpoint.Checkpoint->GetGeneration() == checkpoint.GetGeneration()); YQL_ENSURE(PendingCheckpoint.Checkpoint->GetId() == checkpoint.GetId()); - } - LOG_CP_I("Got checkpoint barrier from channel " << channelId); + } + LOG_CP_I("Got checkpoint barrier from channel " << channelId); ComputeActor->ResumeExecution(); -} - +} + void TDqComputeActorCheckpoints::OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) { Y_VERIFY(CheckpointCoordinator); Y_VERIFY(checkpoint.GetGeneration() <= CheckpointCoordinator->Generation); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h index 27fd851338..974a3089e2 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h @@ -1,13 +1,13 @@ -#pragma once - -#include "dq_compute_actor.h" +#pragma once + +#include "dq_compute_actor.h" #include "dq_compute_actor_sinks.h" #include "retry_queue.h" - + #include <ydb/library/yql/dq/common/dq_common.h> -#include <library/cpp/actors/core/log.h> - +#include <library/cpp/actors/core/log.h> + #include <util/generic/ptr.h> #include <algorithm> @@ -19,21 +19,21 @@ enum ECheckpointingMode; } // namespace NYql::NDqProto namespace NYql::NDq { - + NDqProto::ECheckpointingMode GetTaskCheckpointingMode(const NDqProto::TDqTask& task); class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpoints> { - struct TCheckpointCoordinatorId { + struct TCheckpointCoordinatorId { NActors::TActorId ActorId; - ui64 Generation; - + ui64 Generation; + TCheckpointCoordinatorId(NActors::TActorId actorId, ui64 generation) - : ActorId(actorId) - , Generation(generation) { - } - }; - + : ActorId(actorId) + , Generation(generation) { + } + }; + struct TPendingCheckpoint { TPendingCheckpoint(const NDqProto::TDqTask& task) : SinksCount(GetSinksCount(task)) @@ -63,42 +63,42 @@ class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpo bool SavedComputeActorState = false; }; -public: +public: static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR_CHECKPOINTS"; - struct ICallbacks { - [[nodiscard]] - virtual bool ReadyToCheckpoint() const = 0; + struct ICallbacks { + [[nodiscard]] + virtual bool ReadyToCheckpoint() const = 0; virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const = 0; virtual void CommitState(const NDqProto::TCheckpoint& checkpoint) = 0; virtual void InjectBarrierToOutputs(const NDqProto::TCheckpoint& checkpoint) = 0; - virtual void ResumeInputs() = 0; - - virtual void Start() = 0; - virtual void Stop() = 0; - virtual void ResumeExecution() = 0; - + virtual void ResumeInputs() = 0; + + virtual void Start() = 0; + virtual void Stop() = 0; + virtual void ResumeExecution() = 0; + virtual void LoadState(const NDqProto::TComputeActorState& state) = 0; - + virtual ~ICallbacks() = default; - }; - + }; + TDqComputeActorCheckpoints(const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor); void Init(NActors::TActorId computeActorId, NActors::TActorId checkpointsId); - [[nodiscard]] - bool HasPendingCheckpoint() const; + [[nodiscard]] + bool HasPendingCheckpoint() const; bool ComputeActorStateSaved() const; - void DoCheckpoint(); - bool SaveState(); + void DoCheckpoint(); + bool SaveState(); void RegisterCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId); - + // Sink actor support. void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint); void TryToSavePendingCheckpoint(); -private: - STATEFN(StateFunc); +private: + STATEFN(StateFunc); void Handle(TEvDqCompute::TEvNewCheckpointCoordinator::TPtr&); void Handle(TEvDqCompute::TEvInjectCheckpoint::TPtr&); void Handle(TEvDqCompute::TEvSaveTaskStateResult::TPtr&); @@ -106,33 +106,33 @@ private: void Handle(TEvDqCompute::TEvRestoreFromCheckpoint::TPtr&); void Handle(TEvDqCompute::TEvGetTaskStateResult::TPtr&); void Handle(TEvDqCompute::TEvRun::TPtr& ev); - void Handle(NActors::TEvents::TEvPoison::TPtr&); + void Handle(NActors::TEvents::TEvPoison::TPtr&); void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev); void Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev); - + void PassAway() override; // Validates generation and returns true if it is from old coordinator. - template <class E> + template <class E> bool ShouldIgnoreOldCoordinator(const E& ev, bool verifyOnGenerationFromFuture = true); - -private: + +private: const TTxId TxId; const NDqProto::TDqTask Task; const bool IngressTask; - + const NActors::TActorId CheckpointStorage; - TString GraphId; - + TString GraphId; + ICallbacks* ComputeActor = nullptr; - - TMaybe<TCheckpointCoordinatorId> CheckpointCoordinator; + + TMaybe<TCheckpointCoordinatorId> CheckpointCoordinator; TPendingCheckpoint PendingCheckpoint; TRetryEventsQueue EventsQueue; // Restore NYql::NDqProto::NDqStateLoadPlan::TTaskPlan TaskLoadPlan; -}; - +}; + } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index deaadefb42..c9dfd4314a 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -2,7 +2,7 @@ #include "dq_compute_actor.h" #include "dq_compute_actor_channels.h" -#include "dq_compute_actor_checkpoints.h" +#include "dq_compute_actor_checkpoints.h" #include "dq_compute_actor_sinks.h" #include "dq_compute_actor_sources.h" #include "dq_compute_issues_buffer.h" @@ -59,7 +59,7 @@ constexpr ui32 IssuesBufferSize = 16; template<typename TDerived> class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived> , public TDqComputeActorChannels::ICallbacks - , public TDqComputeActorCheckpoints::ICallbacks + , public TDqComputeActorCheckpoints::ICallbacks , public IDqSourceActor::ICallbacks , public IDqSinkActor::ICallbacks { @@ -267,9 +267,9 @@ protected: } if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) { - Checkpoints->DoCheckpoint(); - } - + Checkpoints->DoCheckpoint(); + } + ProcessOutputsImpl(status); } @@ -400,12 +400,12 @@ protected: Channels->Receive(handle, NActors::TActivationContext::AsActorContext()); } - if (Checkpoints) { + if (Checkpoints) { TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), this->SelfId(), new NActors::TEvents::TEvPoison); Checkpoints->Receive(handle, NActors::TActivationContext::AsActorContext()); - } - + } + for (auto& [_, source] : SourcesMap) { if (source.Actor) { source.SourceActor->PassAway(); @@ -497,7 +497,7 @@ protected: } void ContinueExecute() { - if (!ResumeEventScheduled && Running) { + if (!ResumeEventScheduled && Running) { ResumeEventScheduled = true; this->Send(this->SelfId(), new TEvDqCompute::TEvResumeExecution()); } @@ -522,14 +522,14 @@ public: channel->Push(std::move(*channelData.MutableData())); } - if (channelData.HasCheckpoint()) { + if (channelData.HasCheckpoint()) { Y_VERIFY(inputChannel->CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED); Y_VERIFY(Checkpoints); - const auto& checkpoint = channelData.GetCheckpoint(); - inputChannel->Pause(checkpoint); - Checkpoints->RegisterCheckpoint(checkpoint, channelData.GetChannelId()); - } - + const auto& checkpoint = channelData.GetCheckpoint(); + inputChannel->Pause(checkpoint); + Checkpoints->RegisterCheckpoint(checkpoint, channelData.GetChannelId()); + } + if (channelData.GetFinished()) { channel->Finish(); } @@ -572,22 +572,22 @@ public: } protected: - bool ReadyToCheckpoint() const override { - for (auto& [id, channelInfo] : InputChannelsMap) { + bool ReadyToCheckpoint() const override { + for (auto& [id, channelInfo] : InputChannelsMap) { if (channelInfo.CheckpointingMode == NDqProto::CHECKPOINTING_MODE_DISABLED) { continue; } - if (!channelInfo.IsPaused()) { - return false; - } - if (!channelInfo.Channel->Empty()) { - return false; - } - } - return true; - } - + if (!channelInfo.IsPaused()) { + return false; + } + if (!channelInfo.Channel->Empty()) { + return false; + } + } + return true; + } + void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override { CA_LOG_D("Save state"); NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram(); @@ -602,8 +602,8 @@ protected: source.SourceActor->SaveState(checkpoint, sourceState); sourceState.SetInputIndex(inputIndex); } - } - + } + void CommitState(const NDqProto::TCheckpoint& checkpoint) override { CA_LOG_D("Commit state"); for (auto& [inputIndex, source] : SourcesMap) { @@ -614,20 +614,20 @@ protected: void InjectBarrierToOutputs(const NDqProto::TCheckpoint& checkpoint) override { Y_VERIFY(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED); - for (const auto& [id, channelInfo] : OutputChannelsMap) { + for (const auto& [id, channelInfo] : OutputChannelsMap) { channelInfo.Channel->Push(NDqProto::TCheckpoint(checkpoint)); - } + } for (const auto& [outputIndex, sink] : SinksMap) { sink.Sink->Push(NDqProto::TCheckpoint(checkpoint)); } - } - - void ResumeInputs() override { - for (auto& [id, channelInfo] : InputChannelsMap) { - channelInfo.Resume(); - } - } - + } + + void ResumeInputs() override { + for (auto& [id, channelInfo] : InputChannelsMap) { + channelInfo.Resume(); + } + } + void LoadState(const NDqProto::TComputeActorState& state) override { CA_LOG_D("Load state"); auto guard = BindAllocator(); @@ -653,19 +653,19 @@ protected: return; } ythrow yexception() << "Invalid state version " << version; - } - - void Start() override { - Running = true; + } + + void Start() override { + Running = true; State = NDqProto::COMPUTE_STATE_EXECUTING; - ContinueExecute(); - } - - void Stop() override { - Running = false; + ContinueExecute(); + } + + void Stop() override { + Running = false; State = NDqProto::COMPUTE_STATE_UNKNOWN; - } - + } + protected: struct TInputChannelInfo { ui64 ChannelId; @@ -674,28 +674,28 @@ protected: std::optional<NDqProto::TCheckpoint> PendingCheckpoint; const NDqProto::ECheckpointingMode CheckpointingMode; ui64 FreeSpace = 0; - + explicit TInputChannelInfo(ui64 channelId, NDqProto::ECheckpointingMode checkpointingMode) : ChannelId(channelId) , CheckpointingMode(checkpointingMode) { } - bool IsPaused() const { + bool IsPaused() const { return PendingCheckpoint.has_value(); - } - + } + void Pause(const NDqProto::TCheckpoint& checkpoint) { - YQL_ENSURE(!IsPaused()); + YQL_ENSURE(!IsPaused()); YQL_ENSURE(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED); - PendingCheckpoint = checkpoint; + PendingCheckpoint = checkpoint; Channel->Pause(); - } - - void Resume() { + } + + void Resume() { PendingCheckpoint.reset(); Channel->Resume(); - } + } }; struct TSourceInfo { @@ -785,9 +785,9 @@ protected: protected: void HandleExecuteBase(TEvDqCompute::TEvResumeExecution::TPtr&) { ResumeEventScheduled = false; - if (Running) { + if (Running) { DoExecute(); - } + } } void HandleExecuteBase(TEvDqCompute::TEvChannelsInfo::TPtr& ev) { @@ -876,15 +876,15 @@ protected: void HandleExecuteBase(TEvDqCompute::TEvRun::TPtr& ev) { CA_LOG_D("Got TEvRun from actor " << ev->Sender); - Start(); + Start(); // Event from coordinator should be processed to confirm seq no. TAutoPtr<NActors::IEventHandle> iev(ev.Release()); if (Checkpoints) { Checkpoints->Receive(iev, NActors::TActivationContext::AsActorContext()); } - } - + } + void HandleExecuteBase(TEvDqCompute::TEvStateRequest::TPtr& ev) { CA_LOG_D("Got TEvStateRequest from actor " << ev->Sender << " TaskId: " << Task.GetId() << " PingCookie: " << ev->Cookie); auto evState = MakeHolder<TEvDqCompute::TEvState>(); @@ -896,15 +896,15 @@ protected: } void HandleExecuteBase(TEvDqCompute::TEvNewCheckpointCoordinator::TPtr& ev) { - if (!Checkpoints) { + if (!Checkpoints) { Checkpoints = new TDqComputeActorCheckpoints(TxId, Task, this); Checkpoints->Init(this->SelfId(), this->RegisterWithSameMailbox(Checkpoints)); Channels->SetCheckpointsSupport(); - } - TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), ev->Sender, ev->Release().Release()); + } + TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), ev->Sender, ev->Release().Release()); Checkpoints->Receive(handle, NActors::TActivationContext::AsActorContext()); - } - + } + void HandleExecuteBase(TEvDq::TEvAbortExecution::TPtr& ev) { TString message = ev->Get()->Record.GetMessage(); CA_LOG_E("Handle abort execution event from: " << ev->Sender @@ -992,10 +992,10 @@ private: NDqProto::TData data; NDqProto::TCheckpoint checkpoint; - - bool hasData = channel->Pop(data, bytes); - bool hasCheckpoint = channel->Pop(checkpoint); - if (!hasData && !hasCheckpoint) { + + bool hasData = channel->Pop(data, bytes); + bool hasCheckpoint = channel->Pop(checkpoint); + if (!hasData && !hasCheckpoint) { if (!channel->IsFinished()) { CA_LOG_D("output channelId: " << channel->GetChannelId() << ", nothing to send and is not finished"); return 0; // channel is empty and not finished yet @@ -1006,19 +1006,19 @@ private: const bool becameFinished = !wasFinished && outputChannel.Finished; ui32 dataSize = data.GetRaw().size(); - ui32 checkpointSize = checkpoint.ByteSize(); + ui32 checkpointSize = checkpoint.ByteSize(); NDqProto::TChannelData channelData; channelData.SetChannelId(channel->GetChannelId()); channelData.SetFinished(outputChannel.Finished); - if (hasData) { - channelData.MutableData()->Swap(&data); - } - if (hasCheckpoint) { - channelData.MutableCheckpoint()->Swap(&checkpoint); - CA_LOG_I("Resume inputs"); - ResumeInputs(); - } + if (hasData) { + channelData.MutableData()->Swap(&data); + } + if (hasCheckpoint) { + channelData.MutableCheckpoint()->Swap(&checkpoint); + CA_LOG_I("Resume inputs"); + ResumeInputs(); + } if (hasData || hasCheckpoint || becameFinished) { Channels->SendChannelData(std::move(channelData)); @@ -1467,7 +1467,7 @@ protected: const NDqProto::ECheckpointingMode CheckpointingMode; TIntrusivePtr<IDqTaskRunner> TaskRunner; TDqComputeActorChannels* Channels = nullptr; - TDqComputeActorCheckpoints* Checkpoints = nullptr; + TDqComputeActorCheckpoints* Checkpoints = nullptr; THashMap<ui64, TInputChannelInfo> InputChannelsMap; // Channel id -> Channel info THashMap<ui64, TSourceInfo> SourcesMap; // Input index -> Source info THashMap<ui64, TOutputChannelInfo> OutputChannelsMap; // Channel id -> Channel info @@ -1498,7 +1498,7 @@ protected: TProcessOutputsState ProcessOutputsState; private: - bool Running = true; + bool Running = true; TInstant LastSendStatsTime; }; diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make index c4b20bd502..ae47a3e5b0 100644 --- a/ydb/library/yql/dq/actors/compute/ya.make +++ b/ydb/library/yql/dq/actors/compute/ya.make @@ -9,7 +9,7 @@ SRCS( dq_compute_actor.cpp dq_async_compute_actor.cpp dq_compute_actor_channels.cpp - dq_compute_actor_checkpoints.cpp + dq_compute_actor_checkpoints.cpp dq_compute_actor_io_actors_factory.cpp dq_compute_actor_stats.cpp dq_compute_issues_buffer.cpp diff --git a/ydb/library/yql/dq/actors/dq_events_ids.h b/ydb/library/yql/dq/actors/dq_events_ids.h index 698377a393..800c1818c3 100644 --- a/ydb/library/yql/dq/actors/dq_events_ids.h +++ b/ydb/library/yql/dq/actors/dq_events_ids.h @@ -33,18 +33,18 @@ struct TDqComputeEvents { ReservedKqp_EvScanInitActor, ReservedKqp_EvRemoteScanData, ReservedKqp_EvRemoteScanDataAck, - + EvRun = EventSpaceBegin(TDqEvents::ES_DQ_COMPUTE), EvNewCheckpointCoordinator, - EvInjectCheckpoint, - EvSaveTaskState, - EvSaveTaskStateResult, - EvCommitState, - EvStateCommitted, - EvRestoreFromCheckpoint, - EvRestoreFromCheckpointResult, - EvGetTaskState, + EvInjectCheckpoint, + EvSaveTaskState, + EvSaveTaskStateResult, + EvCommitState, + EvStateCommitted, + EvRestoreFromCheckpoint, + EvRestoreFromCheckpointResult, + EvGetTaskState, EvGetTaskStateResult, EvStateRequest, EvNewCheckpointCoordinatorAck, diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto index 0e38ab3bbc..6d70a971ae 100644 --- a/ydb/library/yql/dq/actors/protos/dq_events.proto +++ b/ydb/library/yql/dq/actors/protos/dq_events.proto @@ -11,7 +11,7 @@ import "ydb/public/api/protos/ydb_issue_message.proto"; message TCheckpoint { optional uint64 Id = 1; - optional uint64 Generation = 2; + optional uint64 Generation = 2; }; /* @@ -19,7 +19,7 @@ message TCheckpoint { 1) Data 2) Checkpoint 3) Finished -*/ +*/ message TChannelData { optional uint64 ChannelId = 1; optional NYql.NDqProto.TData Data = 2; @@ -27,10 +27,10 @@ message TChannelData { optional TCheckpoint Checkpoint = 4; }; -message TEvRun { +message TEvRun { optional TMessageTransportMeta TransportMeta = 100; -} - +} + message TEvChannelsInfo { repeated NYql.NDqProto.TChannel Update = 1; }; @@ -102,76 +102,76 @@ message TComputeRuntimeSettings { optional uint32 TasksOnNodeCount = 5; // approx optional TRlPath RlPath = 7; } - -message TEvNewCheckpointCoordinator { - optional uint64 Generation = 1; - optional string GraphId = 2; + +message TEvNewCheckpointCoordinator { + optional uint64 Generation = 1; + optional string GraphId = 2; optional TMessageTransportMeta TransportMeta = 100; -} - +} + message TEvNewCheckpointCoordinatorAck { optional TMessageTransportMeta TransportMeta = 100; } -message TEvInjectCheckpoint { - optional TCheckpoint Checkpoint = 1; - optional uint64 Generation = 2; +message TEvInjectCheckpoint { + optional TCheckpoint Checkpoint = 1; + optional uint64 Generation = 2; optional TMessageTransportMeta TransportMeta = 100; -} - -message TEvSaveTaskStateResult { - enum EStatus { - UNSPECIFIED = 0; - OK = 1; - STORAGE_ERROR = 2; - INTERNAL_ERROR = 3; - STATE_TOO_BIG = 4; - } - optional TCheckpoint Checkpoint = 1; - optional uint64 TaskId = 2; - optional EStatus Status = 3; - optional uint64 StateSizeBytes = 4; +} + +message TEvSaveTaskStateResult { + enum EStatus { + UNSPECIFIED = 0; + OK = 1; + STORAGE_ERROR = 2; + INTERNAL_ERROR = 3; + STATE_TOO_BIG = 4; + } + optional TCheckpoint Checkpoint = 1; + optional uint64 TaskId = 2; + optional EStatus Status = 3; + optional uint64 StateSizeBytes = 4; optional TMessageTransportMeta TransportMeta = 100; -} - -message TEvCommitState { - optional TCheckpoint Checkpoint = 1; - optional uint64 Generation = 2; +} + +message TEvCommitState { + optional TCheckpoint Checkpoint = 1; + optional uint64 Generation = 2; optional TMessageTransportMeta TransportMeta = 100; -} - -message TEvStateCommitted { - optional TCheckpoint Checkpoint = 1; - optional uint64 TaskId = 2; +} + +message TEvStateCommitted { + optional TCheckpoint Checkpoint = 1; + optional uint64 TaskId = 2; optional TMessageTransportMeta TransportMeta = 100; -} - -message TEvRestoreFromCheckpoint { - optional TCheckpoint Checkpoint = 1; - optional uint64 Generation = 2; +} + +message TEvRestoreFromCheckpoint { + optional TCheckpoint Checkpoint = 1; + optional uint64 Generation = 2; optional NDqStateLoadPlan.TTaskPlan StateLoadPlan = 3; optional TMessageTransportMeta TransportMeta = 100; -} - -message TEvRestoreFromCheckpointResult { - enum ERestoreStatus { - UNSPECIFIED = 0; - OK = 1; - STORAGE_ERROR = 2; - INTERNAL_ERROR = 3; - } - optional TCheckpoint Checkpoint = 1; - optional uint64 TaskId = 2; - optional ERestoreStatus Status = 3; +} + +message TEvRestoreFromCheckpointResult { + enum ERestoreStatus { + UNSPECIFIED = 0; + OK = 1; + STORAGE_ERROR = 2; + INTERNAL_ERROR = 3; + } + optional TCheckpoint Checkpoint = 1; + optional uint64 TaskId = 2; + optional ERestoreStatus Status = 3; optional TMessageTransportMeta TransportMeta = 100; -} +} message TMessageTransportMeta { optional uint64 SeqNo = 1; // SeqNo of message diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index 211d4ac39a..bfa0238d33 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -546,17 +546,17 @@ public: } void Push(NDqProto::TCheckpoint&& checkpoint) override { - YQL_ENSURE(!Checkpoint); - Checkpoint.ConstructInPlace(std::move(checkpoint)); - } - + YQL_ENSURE(!Checkpoint); + Checkpoint.ConstructInPlace(std::move(checkpoint)); + } + [[nodiscard]] bool Pop(NDqProto::TData& data, ui64 bytes) override { LOG("Pop request, rows in memory: " << DataHead.size() << "/" << DataTail.size() << ", spilled rows: " << SpilledRows << ", spilled blobs: " << SpilledBlobs.size() << ", finished: " << Finished << ", requested: " << bytes << ", maxChunkBytes: " << MaxChunkBytes); - if (!HasData()) { + if (!HasData()) { if (Finished) { data.SetTransportVersion(DataSerializer.GetTransportVersion()); data.SetRows(0); @@ -670,15 +670,15 @@ public: [[nodiscard]] bool Pop(NDqProto::TCheckpoint& checkpoint) override { - if (!HasData() && Checkpoint) { + if (!HasData() && Checkpoint) { checkpoint = std::move(*Checkpoint); Checkpoint = Nothing(); - return true; - } - return false; - } - - [[nodiscard]] + return true; + } + return false; + } + + [[nodiscard]] bool PopAll(NDqProto::TData& data) override { YQL_ENSURE(!SpilledRows); YQL_ENSURE(DataTail.empty()); @@ -742,10 +742,10 @@ public: } } - bool HasData() const override { - return !DataHead.empty() || SpilledRows != 0 || !DataTail.empty(); - } - + bool HasData() const override { + return !DataHead.empty() || SpilledRows != 0 || !DataTail.empty(); + } + bool IsFinished() const override { return Finished && !HasData(); } @@ -822,7 +822,7 @@ private: ui64 SpilledRows = 0; bool Finished = false; - + TMaybe<NDqProto::TCheckpoint> Checkpoint; }; diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h index f7a4887d2d..57f794ae8d 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.h +++ b/ydb/library/yql/dq/runtime/dq_output_channel.h @@ -40,7 +40,7 @@ public: [[nodiscard]] virtual bool Pop(NDqProto::TData& data, ui64 bytes) = 0; // Pop chechpoint. Checkpoints may be taken from channel even after it is finished. - [[nodiscard]] + [[nodiscard]] virtual bool Pop(NDqProto::TCheckpoint& checkpoint) = 0; // Only for data-queries // TODO: remove this method and create independent Data- and Stream-query implementations. diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 10900352c3..7192c6fb38 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -620,14 +620,14 @@ public: return Stats.get(); } - TString Save() const override { - return ProgramParsed.CompGraph->SaveGraphState(); - } - - void Load(TStringBuf in) override { - ProgramParsed.CompGraph->LoadGraphState(in); - } - + TString Save() const override { + return ProgramParsed.CompGraph->SaveGraphState(); + } + + void Load(TStringBuf in) override { + ProgramParsed.CompGraph->LoadGraphState(in); + } + private: NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv() { return Context.TypeEnv ? *Context.TypeEnv : *SelfTypeEnv; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index a0cc816e91..56c902c858 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -176,10 +176,10 @@ public: virtual void UpdateStats() = 0; virtual const TDqTaskRunnerStats* GetStats() const = 0; - - [[nodiscard]] - virtual TString Save() const = 0; - virtual void Load(TStringBuf in) = 0; + + [[nodiscard]] + virtual TString Save() const = 0; + virtual void Load(TStringBuf in) = 0; }; TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings, diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index cdcbd32ef5..00b8bd3540 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -616,10 +616,10 @@ public: } } - bool Empty() const override { - ythrow yexception() << "unimplemented"; - } - + bool Empty() const override { + ythrow yexception() << "unimplemented"; + } + void Push(NDqProto::TData&& data) override { try { return Delegate->Push(std::move(data)); @@ -957,10 +957,10 @@ public: } void Push(NDqProto::TCheckpoint&& checkpoint) override { - Y_UNUSED(checkpoint); - ythrow yexception() << "unimplemented"; - } - + Y_UNUSED(checkpoint); + ythrow yexception() << "unimplemented"; + } + void Finish() override { try { NDqProto::TCommandHeader header; @@ -976,11 +976,11 @@ public: // |> // <| consumer methods - - bool HasData() const override { - ythrow yexception() << "unimplemented"; - } - + + bool HasData() const override { + ythrow yexception() << "unimplemented"; + } + bool IsFinished() const override { try { return Delegate->IsFinished(); @@ -998,11 +998,11 @@ public: TaskRunner->RaiseException(); } } - + bool Pop(NDqProto::TCheckpoint&) override { return false; - } - + } + // Only for data-queries // TODO: remove this method and create independent Data- and Stream-query implementations. // Stream-query implementation should be without PopAll method. @@ -1620,15 +1620,15 @@ public: } } - TString Save() const override { - ythrow yexception() << "unimplemented"; - } - - void Load(TStringBuf in) override { - Y_UNUSED(in); - ythrow yexception() << "unimplemented"; - } + TString Save() const override { + ythrow yexception() << "unimplemented"; + } + void Load(TStringBuf in) override { + Y_UNUSED(in); + ythrow yexception() << "unimplemented"; + } + private: TIntrusivePtr<TTaskRunner> Delegate; NDqProto::TDqTask Task; |