aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbeloshabskiy <beloshabskiy@yandex-team.ru>2022-02-10 16:51:13 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:51:13 +0300
commit6b612367cdcddbe5750d116472ea2b621e6b39fa (patch)
tree7ed9159b401b0c92bfcc0ac0d9bb3504b8144711
parentb2519d17aba308e7ddee5687eeb85103642988eb (diff)
downloadydb-6b612367cdcddbe5750d116472ea2b621e6b39fa.tar.gz
Restoring authorship annotation for <beloshabskiy@yandex-team.ru>. Commit 1 of 2.
-rw-r--r--build/rules/kikimr.policy2
-rw-r--r--ydb/core/base/events.h2
-rw-r--r--ydb/core/driver_lib/run/factories.h2
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp6
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h8
-rw-r--r--ydb/core/driver_lib/run/run.cpp2
-rw-r--r--ydb/core/protos/services.proto2
-rw-r--r--ydb/core/testlib/test_client.cpp2
-rw-r--r--ydb/core/ymq/actor/serviceid.h2
-rw-r--r--ydb/core/yq/libs/actors/logging/log.h22
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp10
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.cpp4
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.h4
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/checkpoint_storage.h18
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/events/events.h14
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/gc.cpp6
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp372
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ya.make4
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp380
-rw-r--r--ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp256
-rw-r--r--ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp508
-rw-r--r--ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h214
-rw-r--r--ydb/core/yq/libs/checkpointing/checkpoint_id_generator.cpp32
-rw-r--r--ydb/core/yq/libs/checkpointing/checkpoint_id_generator.h26
-rw-r--r--ydb/core/yq/libs/checkpointing/pending_checkpoint.cpp94
-rw-r--r--ydb/core/yq/libs/checkpointing/pending_checkpoint.h100
-rw-r--r--ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp396
-rw-r--r--ydb/core/yq/libs/checkpointing/ut/ya.make28
-rw-r--r--ydb/core/yq/libs/checkpointing/utils.cpp18
-rw-r--r--ydb/core/yq/libs/checkpointing/utils.h20
-rw-r--r--ydb/core/yq/libs/checkpointing/ya.make52
-rw-r--r--ydb/core/yq/libs/checkpointing_common/defs.cpp6
-rw-r--r--ydb/core/yq/libs/checkpointing_common/defs.h18
-rw-r--r--ydb/core/yq/libs/config/protos/yq_config.proto2
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp28
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp6
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/ut/ya.make2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp14
-rw-r--r--ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.h2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h36
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp2
-rw-r--r--ydb/core/yq/libs/init/init.cpp6
-rw-r--r--ydb/core/yq/libs/init/init.h6
-rw-r--r--ydb/core/yq/libs/ya.make4
-rw-r--r--ydb/library/folder_service/events.h64
-rw-r--r--ydb/library/folder_service/folder_service.cpp20
-rw-r--r--ydb/library/folder_service/folder_service.h12
-rw-r--r--ydb/library/folder_service/mock/mock_folder_service.cpp76
-rw-r--r--ydb/library/folder_service/mock/mock_folder_service.h18
-rw-r--r--ydb/library/folder_service/mock/ya.make28
-rw-r--r--ydb/library/folder_service/proto/config.proto18
-rw-r--r--ydb/library/folder_service/proto/folder_service.proto72
-rw-r--r--ydb/library/folder_service/proto/ya.make22
-rw-r--r--ydb/library/folder_service/ya.make34
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_checkpoints.h12
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h156
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp250
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h94
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h170
-rw-r--r--ydb/library/yql/dq/actors/compute/ya.make2
-rw-r--r--ydb/library/yql/dq/actors/dq_events_ids.h18
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_events.proto112
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp34
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.h2
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp16
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h8
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp48
69 files changed, 2014 insertions, 2014 deletions
diff --git a/build/rules/kikimr.policy b/build/rules/kikimr.policy
index 5bb6426e30..0a0d85e495 100644
--- a/build/rules/kikimr.policy
+++ b/build/rules/kikimr.policy
@@ -15,7 +15,7 @@ ALLOW solomon/ -> kikimr/core/protos
ALLOW solomon/ -> kikimr/public/lib/deprecated/kicli
ALLOW solomon/ -> kikimr/library/mkql_proto/protos
ALLOW solomon/ -> kikimr/yq/libs/config/protos
-ALLOW solomon/ -> kikimr/library/folder_service/proto
+ALLOW solomon/ -> kikimr/library/folder_service/proto
ALLOW solomon/ -> kikimr/library/login/protos
# temporary (FIXME: gvit)
ALLOW yweb/robot/fetcher/generic_actors -> kikimr/core/protos
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index f5fedfe19b..66cef140eb 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -141,7 +141,7 @@ struct TKikimrEvents : TEvents {
ES_SEQUENCEPROXY, // 4217
ES_CLOUD_STORAGE,
ES_CLOUD_STORAGE_PRIVATE,
- ES_FOLDER_SERVICE_ADAPTER,
+ ES_FOLDER_SERVICE_ADAPTER,
ES_PQ_PARTITION_WRITER,
ES_YDB_PROXY,
ES_REPLICATION_CONTROLLER,
diff --git a/ydb/core/driver_lib/run/factories.h b/ydb/core/driver_lib/run/factories.h
index 41a45b44e0..5601d7ee5a 100644
--- a/ydb/core/driver_lib/run/factories.h
+++ b/ydb/core/driver_lib/run/factories.h
@@ -13,7 +13,7 @@
#include <ydb/library/folder_service/proto/config.pb.h>
#include <ydb/library/pdisk_io/aio.h>
#include <ydb/core/yq/libs/config/protos/audit.pb.h>
-
+
#include <ydb/library/yql/providers/pq/cm_client/interface/client.h>
#include <library/cpp/actors/core/actorsystem.h>
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 819c1478d1..6766787795 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -138,7 +138,7 @@
#include <ydb/library/folder_service/folder_service.h>
#include <ydb/library/folder_service/proto/config.pb.h>
-
+
#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
#include <library/cpp/actors/protos/services_common.pb.h>
@@ -2084,9 +2084,9 @@ void TLeaseHolderInitializer::InitializeServices(NActors::TActorSystemSetup* set
}
}
-TSqsServiceInitializer::TSqsServiceInitializer(const TKikimrRunConfig& runConfig, const std::shared_ptr<TModuleFactories>& factories)
+TSqsServiceInitializer::TSqsServiceInitializer(const TKikimrRunConfig& runConfig, const std::shared_ptr<TModuleFactories>& factories)
: IKikimrServicesInitializer(runConfig)
- , Factories(factories)
+ , Factories(factories)
{
}
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index 407ce1bb7b..0bc9d47b5e 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -413,12 +413,12 @@ public:
class TSqsServiceInitializer : public IKikimrServicesInitializer {
public:
- TSqsServiceInitializer(const TKikimrRunConfig& runConfig, const std::shared_ptr<TModuleFactories>& factories);
+ TSqsServiceInitializer(const TKikimrRunConfig& runConfig, const std::shared_ptr<TModuleFactories>& factories);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
-
-private:
- std::shared_ptr<TModuleFactories> Factories;
+
+private:
+ std::shared_ptr<TModuleFactories> Factories;
};
class TConfigsDispatcherInitializer : public IKikimrServicesInitializer {
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index a4f74aa4e0..956b8b3b26 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1277,7 +1277,7 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
}
if (serviceMask.EnableSqs) {
- sil->AddServiceInitializer(new TSqsServiceInitializer(runConfig, ModuleFactories));
+ sil->AddServiceInitializer(new TSqsServiceInitializer(runConfig, ModuleFactories));
}
if (serviceMask.EnableConfigsDispatcher) {
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index c17c8a7dc3..59638f817b 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -285,7 +285,7 @@ enum EServiceKikimr {
STREAMS_STORAGE_SERVICE = 1013;
STREAMS_SCHEDULER_SERVICE = 1014;
STREAMS_RESOURCE_SERVICE = 1015;
- STREAMS_CHECKPOINT_COORDINATOR = 1016;
+ STREAMS_CHECKPOINT_COORDINATOR = 1016;
STREAMS_CONTROL_PLANE_SERVICE = 1017;
STREAMS_GRAND_LEADER_SERVICE = 1018;
STREAMS_META_STORAGE_SERVICE = 1019;
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index d4907c26f1..2f490afe54 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -810,7 +810,7 @@ namespace Tests {
&appData,
"TestTenant",
nullptr, // MakeIntrusive<NPq::NConfigurationManager::TConnections>(),
- yqSharedResources,
+ yqSharedResources,
NKikimr::NFolderService::CreateMockFolderServiceActor,
NYq::CreateMockYqAuditServiceActor,
ydbCredFactory,
diff --git a/ydb/core/ymq/actor/serviceid.h b/ydb/core/ymq/actor/serviceid.h
index 7219d918ba..37cb78e9f3 100644
--- a/ydb/core/ymq/actor/serviceid.h
+++ b/ydb/core/ymq/actor/serviceid.h
@@ -35,7 +35,7 @@ IActor* CreateSqsService(TMaybe<ui32> ydbPort = Nothing());
IActor* CreateSqsProxyService();
IActor* CreateSqsAccessService(const TString& address, const TString& pathToRootCA);
IActor* CreateSqsFolderService(const TString& address, const TString& pathToRootCA);
-IActor* CreateMockSqsFolderService();
+IActor* CreateMockSqsFolderService();
IActor* CreateSqsMeteringService();
} // namespace NKikimr::NSQS
diff --git a/ydb/core/yq/libs/actors/logging/log.h b/ydb/core/yq/libs/actors/logging/log.h
index a5a938f014..2478711447 100644
--- a/ydb/core/yq/libs/actors/logging/log.h
+++ b/ydb/core/yq/libs/actors/logging/log.h
@@ -65,17 +65,17 @@
#define LOG_STREAMS_RESOURCE_SERVICE_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_RESOURCE_SERVICE, logRecordStream)
#define LOG_STREAMS_RESOURCE_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_RESOURCE_SERVICE, logRecordStream)
#define LOG_STREAMS_RESOURCE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_RESOURCE_SERVICE, logRecordStream)
-
-// Component: STREAMS_CHECKPOINT_COORDINATOR.
-#define LOG_STREAMS_CHECKPOINT_COORDINATOR_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
-#define LOG_STREAMS_CHECKPOINT_COORDINATOR_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
-#define LOG_STREAMS_CHECKPOINT_COORDINATOR_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
-#define LOG_STREAMS_CHECKPOINT_COORDINATOR_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
-#define LOG_STREAMS_CHECKPOINT_COORDINATOR_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
-#define LOG_STREAMS_CHECKPOINT_COORDINATOR_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
-#define LOG_STREAMS_CHECKPOINT_COORDINATOR_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
-#define LOG_STREAMS_CHECKPOINT_COORDINATOR_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
-#define LOG_STREAMS_CHECKPOINT_COORDINATOR_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
+
+// Component: STREAMS_CHECKPOINT_COORDINATOR.
+#define LOG_STREAMS_CHECKPOINT_COORDINATOR_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
+#define LOG_STREAMS_CHECKPOINT_COORDINATOR_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
+#define LOG_STREAMS_CHECKPOINT_COORDINATOR_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
+#define LOG_STREAMS_CHECKPOINT_COORDINATOR_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
+#define LOG_STREAMS_CHECKPOINT_COORDINATOR_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
+#define LOG_STREAMS_CHECKPOINT_COORDINATOR_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
+#define LOG_STREAMS_CHECKPOINT_COORDINATOR_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
+#define LOG_STREAMS_CHECKPOINT_COORDINATOR_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
+#define LOG_STREAMS_CHECKPOINT_COORDINATOR_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream)
// Component: STREAMS_CONTROL_PLANE_SERVICE.
#define LOG_STREAMS_CONTROL_PLANE_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_CONTROL_PLANE_SERVICE, logRecordStream)
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 5549f8f254..14f2703dd0 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -1127,7 +1127,7 @@ private:
// out params:
gatewaysConfig,
clusters);
-
+
TVector<TDataProviderInitializer> dataProvidersInit;
const auto dbResolver = std::make_shared<TDatabaseAsyncResolverWithMeta>(TDatabaseAsyncResolverWithMeta(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver,
Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId, Params.AuthToken, Params.AccountIdSignatures, Connections));
@@ -1151,7 +1151,7 @@ private:
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory));
}
- {
+ {
NYql::TPqGatewayServices pqServices(
Params.Driver,
Params.PqCmConnections,
@@ -1159,10 +1159,10 @@ private:
std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()),
Params.FunctionRegistry
);
- const auto pqGateway = NYql::CreatePqNativeGateway(pqServices);
+ const auto pqGateway = NYql::CreatePqNativeGateway(pqServices);
dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver));
- }
-
+ }
+
{
auto solomonConfig = gatewaysConfig.GetSolomon();
auto solomonGateway = NYql::CreateSolomonGateway(solomonConfig);
diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp
index 1446cae1bb..e01ba1ca8f 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.cpp
+++ b/ydb/core/yq/libs/actors/run_actor_params.cpp
@@ -29,7 +29,7 @@ TRunActorParams::TRunActorParams(
TVector<YandexQuery::Connection> connections,
TVector<YandexQuery::Binding> bindings,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- THashMap<TString, TString> accountIdSignatures,
+ THashMap<TString, TString> accountIdSignatures,
YandexQuery::QueryContent::QueryType queryType,
YandexQuery::ExecuteMode executeMode,
const TString& resultId,
@@ -71,7 +71,7 @@ TRunActorParams::TRunActorParams(
, Bindings(std::move(bindings))
, CredentialsFactory(std::move(credentialsFactory))
, AccountIdSignatures(std::move(accountIdSignatures))
- , QueryType(queryType)
+ , QueryType(queryType)
, ExecuteMode(executeMode)
, ResultId(resultId)
, StateLoadMode(stateLoadMode)
diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h
index bad5c3c32b..7a6b7436bd 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.h
+++ b/ydb/core/yq/libs/actors/run_actor_params.h
@@ -43,7 +43,7 @@ struct TRunActorParams { // TODO2 : Change name
TVector<YandexQuery::Connection> connections,
TVector<YandexQuery::Binding> bindings,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- THashMap<TString, TString> accountIdSignatures,
+ THashMap<TString, TString> accountIdSignatures,
YandexQuery::QueryContent::QueryType queryType,
YandexQuery::ExecuteMode executeMode,
const TString& resultId,
@@ -90,7 +90,7 @@ struct TRunActorParams { // TODO2 : Change name
const TVector<YandexQuery::Binding> Bindings;
const NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
const THashMap<TString, TString> AccountIdSignatures;
- const YandexQuery::QueryContent::QueryType QueryType;
+ const YandexQuery::QueryContent::QueryType QueryType;
const YandexQuery::ExecuteMode ExecuteMode;
const TString ResultId;
const YandexQuery::StateLoadMode StateLoadMode;
diff --git a/ydb/core/yq/libs/checkpoint_storage/checkpoint_storage.h b/ydb/core/yq/libs/checkpoint_storage/checkpoint_storage.h
index 74923fc816..db4dab4fab 100644
--- a/ydb/core/yq/libs/checkpoint_storage/checkpoint_storage.h
+++ b/ydb/core/yq/libs/checkpoint_storage/checkpoint_storage.h
@@ -17,8 +17,8 @@ class ICheckpointStorage : public virtual TThrRefBase {
public:
using TGetCheckpointsResult = std::pair<TCheckpoints, NYql::TIssues>;
using TGetCoordinatorsResult = std::pair<TCoordinators, NYql::TIssues>;
- using TAddToStateSizeResult = std::pair<ui64, NYql::TIssues>;
- using TGetTotalCheckpointsStateSizeResult = std::pair<ui64, NYql::TIssues>;
+ using TAddToStateSizeResult = std::pair<ui64, NYql::TIssues>;
+ using TGetTotalCheckpointsStateSizeResult = std::pair<ui64, NYql::TIssues>;
using TCreateCheckpointResult = std::pair<TString, NYql::TIssues>; // graphDescId for subsequent usage.
virtual NThreading::TFuture<NYql::TIssues> Init() = 0;
@@ -50,7 +50,7 @@ public:
const TCoordinatorId& coordinator,
const TCheckpointId& checkpointId) = 0;
- virtual NThreading::TFuture<TGetCheckpointsResult> GetCheckpoints(const TString& graph) = 0;
+ virtual NThreading::TFuture<TGetCheckpointsResult> GetCheckpoints(const TString& graph) = 0;
virtual NThreading::TFuture<TGetCheckpointsResult> GetCheckpoints(
const TString& graph, const TVector<ECheckpointStatus>& statuses, ui64 limit, bool loadGraphDescription = false) = 0;
@@ -71,12 +71,12 @@ public:
const TString& graphId,
const TCheckpointId& checkpointUpperBound) = 0;
- virtual NThreading::TFuture<ICheckpointStorage::TAddToStateSizeResult> AddToStateSize(
- const TString& graphId,
- const TCheckpointId& checkpoint,
- ui64 size) = 0;
-
- virtual NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> GetTotalCheckpointsStateSize(const TString& graphId) = 0;
+ virtual NThreading::TFuture<ICheckpointStorage::TAddToStateSizeResult> AddToStateSize(
+ const TString& graphId,
+ const TCheckpointId& checkpoint,
+ ui64 size) = 0;
+
+ virtual NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> GetTotalCheckpointsStateSize(const TString& graphId) = 0;
};
using TCheckpointStoragePtr = TIntrusivePtr<ICheckpointStorage>;
diff --git a/ydb/core/yq/libs/checkpoint_storage/events/events.h b/ydb/core/yq/libs/checkpoint_storage/events/events.h
index 2641f85bae..2fd8659711 100644
--- a/ydb/core/yq/libs/checkpoint_storage/events/events.h
+++ b/ydb/core/yq/libs/checkpoint_storage/events/events.h
@@ -28,7 +28,7 @@ struct TEvCheckpointStorage {
EvGetCheckpointsMetadataResponse,
// Internal Storage events.
- EvNewCheckpointSucceeded,
+ EvNewCheckpointSucceeded,
EvEnd,
};
@@ -161,15 +161,15 @@ struct TEvCheckpointStorage {
struct TEvGetCheckpointsMetadataRequest
: NActors::TEventLocal<TEvGetCheckpointsMetadataRequest, EvGetCheckpointsMetadataRequest> {
explicit TEvGetCheckpointsMetadataRequest(TString graphId, TVector<ECheckpointStatus> statuses = TVector<ECheckpointStatus>(), ui64 limit = std::numeric_limits<ui64>::max(), bool loadGraphDescription = false)
- : GraphId(std::move(graphId))
- , Statuses(std::move(statuses))
+ : GraphId(std::move(graphId))
+ , Statuses(std::move(statuses))
, Limit(limit)
, LoadGraphDescription(loadGraphDescription) {
}
TString GraphId;
- TVector<ECheckpointStatus> Statuses;
- ui64 Limit;
+ TVector<ECheckpointStatus> Statuses;
+ ui64 Limit;
bool LoadGraphDescription = false;
};
@@ -185,8 +185,8 @@ struct TEvCheckpointStorage {
};
// note that no response exists
- struct TEvNewCheckpointSucceeded : NActors::TEventLocal<TEvNewCheckpointSucceeded, EvNewCheckpointSucceeded> {
- TEvNewCheckpointSucceeded(TCoordinatorId coordinatorId, TCheckpointId checkpointId)
+ struct TEvNewCheckpointSucceeded : NActors::TEventLocal<TEvNewCheckpointSucceeded, EvNewCheckpointSucceeded> {
+ TEvNewCheckpointSucceeded(TCoordinatorId coordinatorId, TCheckpointId checkpointId)
: CoordinatorId(std::move(coordinatorId))
, CheckpointId(std::move(checkpointId))
{
diff --git a/ydb/core/yq/libs/checkpoint_storage/gc.cpp b/ydb/core/yq/libs/checkpoint_storage/gc.cpp
index 2a0e73ecb4..6c98a83381 100644
--- a/ydb/core/yq/libs/checkpoint_storage/gc.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/gc.cpp
@@ -64,10 +64,10 @@ public:
private:
STRICT_STFUNC(StateFunc,
- HFunc(TEvCheckpointStorage::TEvNewCheckpointSucceeded, Handle);
+ HFunc(TEvCheckpointStorage::TEvNewCheckpointSucceeded, Handle);
)
- void Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx);
+ void Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx);
};
TActorGC::TActorGC(const TCheckpointStoragePtr& checkpointStorage, const TStateStoragePtr& stateStorage)
@@ -83,7 +83,7 @@ void TActorGC::Bootstrap(const TActorContext&)
LOG_STREAMS_STORAGE_SERVICE_INFO("Successfully bootstrapped storage GC " << SelfId());
}
-void TActorGC::Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx)
+void TActorGC::Handle(TEvCheckpointStorage::TEvNewCheckpointSucceeded::TPtr& ev, const NActors::TActorContext& ctx)
{
const auto* event = ev->Get();
const auto& graphId = event->CoordinatorId.GraphId;
diff --git a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
index 07c8d5101a..d21291b2e5 100644
--- a/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/storage_proxy.cpp
@@ -8,12 +8,12 @@
#include <ydb/core/yq/libs/checkpointing_common/defs.h>
#include <ydb/core/yq/libs/checkpoint_storage/events/events.h>
-
+
#include <ydb/core/yq/libs/actors/logging/log.h>
#include <ydb/core/yq/libs/ydb/util.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
-
+
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -24,7 +24,7 @@
#define LOG_STORAGE_ASYNC_DEBUG(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_DEBUG, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream);
#define LOG_STORAGE_ASYNC_INFO(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_INFO, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream);
#define LOG_STORAGE_ASYNC_WARN(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_WARN, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream);
-
+
namespace NYq {
using namespace NActors;
@@ -61,8 +61,8 @@ private:
hFunc(TEvCheckpointStorage::TEvAbortCheckpointRequest, Handle);
hFunc(TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest, Handle);
- hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskState, Handle);
- hFunc(NYql::NDq::TEvDqCompute::TEvGetTaskState, Handle);
+ hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskState, Handle);
+ hFunc(NYql::NDq::TEvDqCompute::TEvGetTaskState, Handle);
)
void Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest::TPtr& ev);
@@ -74,8 +74,8 @@ private:
void Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest::TPtr& ev);
- void Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev);
- void Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev);
+ void Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev);
+ void Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev);
};
static void FillDefaultParameters(NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig, NConfig::TYdbStorageConfig& ydbStorageConfig) {
@@ -135,255 +135,255 @@ void TStorageProxy::Bootstrap() {
void TStorageProxy::Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest::TPtr& ev) {
const auto* event = ev->Get();
- LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] Got TEvRegisterCoordinatorRequest")
-
- CheckpointStorage->RegisterGraphCoordinator(event->CoordinatorId)
- .Apply([coordinatorId = event->CoordinatorId,
- cookie = ev->Cookie,
- sender = ev->Sender,
- context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
- auto response = std::make_unique<TEvCheckpointStorage::TEvRegisterCoordinatorResponse>();
- response->Issues = issuesFuture.GetValue();
- if (response->Issues) {
- LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString())
- } else {
- LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] Graph registered")
- }
- LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse")
- context.Send(sender, response.release(), 0, cookie);
- });
+ LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] Got TEvRegisterCoordinatorRequest")
+
+ CheckpointStorage->RegisterGraphCoordinator(event->CoordinatorId)
+ .Apply([coordinatorId = event->CoordinatorId,
+ cookie = ev->Cookie,
+ sender = ev->Sender,
+ context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
+ auto response = std::make_unique<TEvCheckpointStorage::TEvRegisterCoordinatorResponse>();
+ response->Issues = issuesFuture.GetValue();
+ if (response->Issues) {
+ LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString())
+ } else {
+ LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] Graph registered")
+ }
+ LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse")
+ context.Send(sender, response.release(), 0, cookie);
+ });
}
void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPtr& ev) {
const auto* event = ev->Get();
- LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvCreateCheckpointRequest")
+ LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvCreateCheckpointRequest")
- CheckpointStorage->GetTotalCheckpointsStateSize(event->CoordinatorId.GraphId)
- .Apply([checkpointId = event->CheckpointId,
- coordinatorId = event->CoordinatorId,
- cookie = ev->Cookie,
- sender = ev->Sender,
+ CheckpointStorage->GetTotalCheckpointsStateSize(event->CoordinatorId.GraphId)
+ .Apply([checkpointId = event->CheckpointId,
+ coordinatorId = event->CoordinatorId,
+ cookie = ev->Cookie,
+ sender = ev->Sender,
totalGraphCheckpointsSizeLimit = Config.GetStateStorageLimits().GetMaxGraphCheckpointsSizeBytes(),
- context = TActivationContext::AsActorContext()]
- (const NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult>& resultFuture) {
- auto result = resultFuture.GetValue();
- auto issues = result.second;
-
- if (issues) {
+ context = TActivationContext::AsActorContext()]
+ (const NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult>& resultFuture) {
+ auto result = resultFuture.GetValue();
+ auto issues = result.second;
+
+ if (issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString());
context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
- return false;
- }
-
- auto totalGraphCheckpointsSize = result.first;
-
- if (totalGraphCheckpointsSize > totalGraphCheckpointsSizeLimit) {
- TStringStream ss;
- ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize;
- auto message = ss.Str();
- LOG_STORAGE_ASYNC_WARN(context, message)
- issues.AddIssue(message);
+ return false;
+ }
+
+ auto totalGraphCheckpointsSize = result.first;
+
+ if (totalGraphCheckpointsSize > totalGraphCheckpointsSizeLimit) {
+ TStringStream ss;
+ ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize;
+ auto message = ss.Str();
+ LOG_STORAGE_ASYNC_WARN(context, message)
+ issues.AddIssue(message);
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
- return false;
- }
- return true;
- })
+ return false;
+ }
+ return true;
+ })
.Apply([checkpointId = event->CheckpointId,
coordinatorId = event->CoordinatorId,
cookie = ev->Cookie,
sender = ev->Sender,
graphDesc = event->GraphDescription,
storage = CheckpointStorage]
- (const NThreading::TFuture<bool>& passedSizeLimitCheckFuture) {
- if (!passedSizeLimitCheckFuture.GetValue()) {
+ (const NThreading::TFuture<bool>& passedSizeLimitCheckFuture) {
+ if (!passedSizeLimitCheckFuture.GetValue()) {
return NThreading::TFuture<ICheckpointStorage::TCreateCheckpointResult>();
- }
+ }
if (std::holds_alternative<TString>(graphDesc)) {
return storage->CreateCheckpoint(coordinatorId, checkpointId, std::get<TString>(graphDesc), ECheckpointStatus::Pending);
} else {
return storage->CreateCheckpoint(coordinatorId, checkpointId, std::get<NProto::TCheckpointGraphDescription>(graphDesc), ECheckpointStatus::Pending);
}
- })
- .Apply([checkpointId = event->CheckpointId,
- coordinatorId = event->CoordinatorId,
- cookie = ev->Cookie,
- sender = ev->Sender,
- context = TActivationContext::AsActorContext()]
+ })
+ .Apply([checkpointId = event->CheckpointId,
+ coordinatorId = event->CoordinatorId,
+ cookie = ev->Cookie,
+ sender = ev->Sender,
+ context = TActivationContext::AsActorContext()]
(const NThreading::TFuture<ICheckpointStorage::TCreateCheckpointResult>& resultFuture) {
if (!resultFuture.Initialized()) { // didn't pass the size limit check
- return;
- }
+ return;
+ }
auto result = resultFuture.GetValue();
auto issues = result.second;
auto response = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointResponse>(checkpointId, std::move(issues), result.first);
- if (response->Issues) {
+ if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString());
- } else {
+ } else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created");
- }
+ }
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
- context.Send(sender, response.release(), 0, cookie);
- });
+ context.Send(sender, response.release(), 0, cookie);
+ });
}
void TStorageProxy::Handle(TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest::TPtr& ev) {
const auto* event = ev->Get();
- LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvSetCheckpointPendingCommitStatusRequest")
- CheckpointStorage->UpdateCheckpointStatus(event->CoordinatorId, event->CheckpointId, ECheckpointStatus::PendingCommit, ECheckpointStatus::Pending)
- .Apply([checkpointId = event->CheckpointId,
- coordinatorId = event->CoordinatorId,
- cookie = ev->Cookie,
- sender = ev->Sender,
- context = TActivationContext::AsActorContext()]
- (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
- auto issues = issuesFuture.GetValue();
- auto response = std::make_unique<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse>(checkpointId, std::move(issues));
- if (response->Issues) {
- LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString())
- } else {
- LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'")
- }
- LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse")
- context.Send(sender, response.release(), 0, cookie);
- });
+ LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvSetCheckpointPendingCommitStatusRequest")
+ CheckpointStorage->UpdateCheckpointStatus(event->CoordinatorId, event->CheckpointId, ECheckpointStatus::PendingCommit, ECheckpointStatus::Pending)
+ .Apply([checkpointId = event->CheckpointId,
+ coordinatorId = event->CoordinatorId,
+ cookie = ev->Cookie,
+ sender = ev->Sender,
+ context = TActivationContext::AsActorContext()]
+ (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
+ auto issues = issuesFuture.GetValue();
+ auto response = std::make_unique<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse>(checkpointId, std::move(issues));
+ if (response->Issues) {
+ LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString())
+ } else {
+ LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'")
+ }
+ LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse")
+ context.Send(sender, response.release(), 0, cookie);
+ });
}
void TStorageProxy::Handle(TEvCheckpointStorage::TEvCompleteCheckpointRequest::TPtr& ev) {
const auto* event = ev->Get();
- LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvCompleteCheckpointRequest")
- CheckpointStorage->UpdateCheckpointStatus(event->CoordinatorId, event->CheckpointId, ECheckpointStatus::Completed, ECheckpointStatus::PendingCommit)
- .Apply([checkpointId = event->CheckpointId,
- coordinatorId = event->CoordinatorId,
- cookie = ev->Cookie,
- sender = ev->Sender,
+ LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvCompleteCheckpointRequest")
+ CheckpointStorage->UpdateCheckpointStatus(event->CoordinatorId, event->CheckpointId, ECheckpointStatus::Completed, ECheckpointStatus::PendingCommit)
+ .Apply([checkpointId = event->CheckpointId,
+ coordinatorId = event->CoordinatorId,
+ cookie = ev->Cookie,
+ sender = ev->Sender,
gcEnabled = Config.GetCheckpointGarbageConfig().GetEnabled(),
- actorGC = ActorGC,
- context = TActivationContext::AsActorContext()]
- (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
- auto issues = issuesFuture.GetValue();
- auto response = std::make_unique<TEvCheckpointStorage::TEvCompleteCheckpointResponse>(checkpointId, std::move(issues));
- if (response->Issues) {
- LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString())
- } else {
- LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'")
- if (gcEnabled) {
- auto request = std::make_unique<TEvCheckpointStorage::TEvNewCheckpointSucceeded>(coordinatorId, checkpointId);
- LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded")
- context.Send(actorGC, request.release(), 0);
- }
- }
- LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse")
- context.Send(sender, response.release(), 0, cookie);
- });
+ actorGC = ActorGC,
+ context = TActivationContext::AsActorContext()]
+ (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
+ auto issues = issuesFuture.GetValue();
+ auto response = std::make_unique<TEvCheckpointStorage::TEvCompleteCheckpointResponse>(checkpointId, std::move(issues));
+ if (response->Issues) {
+ LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString())
+ } else {
+ LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'")
+ if (gcEnabled) {
+ auto request = std::make_unique<TEvCheckpointStorage::TEvNewCheckpointSucceeded>(coordinatorId, checkpointId);
+ LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded")
+ context.Send(actorGC, request.release(), 0);
+ }
+ }
+ LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse")
+ context.Send(sender, response.release(), 0, cookie);
+ });
}
void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr& ev) {
const auto* event = ev->Get();
- LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvAbortCheckpointRequest")
- CheckpointStorage->AbortCheckpoint(event->CoordinatorId,event->CheckpointId)
- .Apply([checkpointId = event->CheckpointId,
- coordinatorId = event->CoordinatorId,
- cookie = ev->Cookie,
- sender = ev->Sender,
- context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
- auto issues = issuesFuture.GetValue();
- auto response = std::make_unique<TEvCheckpointStorage::TEvAbortCheckpointResponse>(checkpointId, std::move(issues));
- if (response->Issues) {
- LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString())
- } else {
- LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted")
- }
- LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse")
- context.Send(sender, response.release(), 0, cookie);
- });
+ LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->CoordinatorId << "] [" << event->CheckpointId << "] Got TEvAbortCheckpointRequest")
+ CheckpointStorage->AbortCheckpoint(event->CoordinatorId,event->CheckpointId)
+ .Apply([checkpointId = event->CheckpointId,
+ coordinatorId = event->CoordinatorId,
+ cookie = ev->Cookie,
+ sender = ev->Sender,
+ context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
+ auto issues = issuesFuture.GetValue();
+ auto response = std::make_unique<TEvCheckpointStorage::TEvAbortCheckpointResponse>(checkpointId, std::move(issues));
+ if (response->Issues) {
+ LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString())
+ } else {
+ LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted")
+ }
+ LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse")
+ context.Send(sender, response.release(), 0, cookie);
+ });
}
void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest::TPtr& ev) {
const auto* event = ev->Get();
- LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->GraphId << "] Got TEvGetCheckpointsMetadataRequest")
+ LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << event->GraphId << "] Got TEvGetCheckpointsMetadataRequest")
CheckpointStorage->GetCheckpoints(event->GraphId, event->Statuses, event->Limit, event->LoadGraphDescription)
- .Apply([graphId = event->GraphId,
- cookie = ev->Cookie,
- sender = ev->Sender,
- context = TActivationContext::AsActorContext()] (const NThreading::TFuture<ICheckpointStorage::TGetCheckpointsResult>& futureResult) {
- auto result = futureResult.GetValue();
- auto response = std::make_unique<TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse>(result.first, result.second);
- if (response->Issues) {
- LOG_STORAGE_ASYNC_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString())
- }
- LOG_STORAGE_ASYNC_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse")
- context.Send(sender, response.release(), 0, cookie);
- });
+ .Apply([graphId = event->GraphId,
+ cookie = ev->Cookie,
+ sender = ev->Sender,
+ context = TActivationContext::AsActorContext()] (const NThreading::TFuture<ICheckpointStorage::TGetCheckpointsResult>& futureResult) {
+ auto result = futureResult.GetValue();
+ auto response = std::make_unique<TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse>(result.first, result.second);
+ if (response->Issues) {
+ LOG_STORAGE_ASYNC_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString())
+ }
+ LOG_STORAGE_ASYNC_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse")
+ context.Send(sender, response.release(), 0, cookie);
+ });
}
-void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev) {
+void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev) {
auto* event = ev->Get();
- const auto checkpointId = TCheckpointId(event->Checkpoint.GetGeneration(), event->Checkpoint.GetId());
+ const auto checkpointId = TCheckpointId(event->Checkpoint.GetGeneration(), event->Checkpoint.GetId());
LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << checkpointId << "] Got TEvSaveTaskState: task " << event->TaskId);
-
+
const size_t stateSize = event->State.ByteSizeLong();
if (stateSize > Config.GetStateStorageLimits().GetMaxTaskStateSizeBytes()) {
LOG_STREAMS_STORAGE_SERVICE_WARN("[" << checkpointId << "] Won't save task state because it's too big: task: " << event->TaskId
<< ", state size: " << stateSize << "/" << Config.GetStateStorageLimits().GetMaxTaskStateSizeBytes());
- auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
- response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
- response->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo);
- response->Record.SetStateSizeBytes(0);
+ auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
+ response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
+ response->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo);
+ response->Record.SetStateSizeBytes(0);
response->Record.SetTaskId(event->TaskId);
- response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STATE_TOO_BIG);
- Send(ev->Sender, response.release());
- return;
- }
-
- StateStorage->SaveState(event->TaskId, event->GraphId, checkpointId, event->State)
- .Apply([graphId = event->GraphId,
- checkpointId,
- taskId = event->TaskId,
- cookie = ev->Cookie,
- sender = ev->Sender,
+ response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STATE_TOO_BIG);
+ Send(ev->Sender, response.release());
+ return;
+ }
+
+ StateStorage->SaveState(event->TaskId, event->GraphId, checkpointId, event->State)
+ .Apply([graphId = event->GraphId,
+ checkpointId,
+ taskId = event->TaskId,
+ cookie = ev->Cookie,
+ sender = ev->Sender,
stateSize = stateSize,
- context = TActivationContext::AsActorContext()](const NThreading::TFuture<NYql::TIssues>& futureResult) {
- const auto& issues = futureResult.GetValue();
- auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
- response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
- response->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo);
- response->Record.SetStateSizeBytes(stateSize);
+ context = TActivationContext::AsActorContext()](const NThreading::TFuture<NYql::TIssues>& futureResult) {
+ const auto& issues = futureResult.GetValue();
+ auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
+ response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
+ response->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo);
+ response->Record.SetStateSizeBytes(stateSize);
response->Record.SetTaskId(taskId);
-
- if (issues) {
- LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString())
- response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR);
- } else {
- response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK);
- }
- LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult")
- context.Send(sender, response.release(), 0, cookie);
- });
+
+ if (issues) {
+ LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString())
+ response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR);
+ } else {
+ response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK);
+ }
+ LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult")
+ context.Send(sender, response.release(), 0, cookie);
+ });
}
-void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
+void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
const auto* event = ev->Get();
- const auto checkpointId = TCheckpointId(event->Checkpoint.GetGeneration(), event->Checkpoint.GetId());
+ const auto checkpointId = TCheckpointId(event->Checkpoint.GetGeneration(), event->Checkpoint.GetId());
LOG_STREAMS_STORAGE_SERVICE_DEBUG("[" << checkpointId << "] Got TEvGetTaskState: tasks {" << JoinSeq(", ", event->TaskIds) << "}");
-
+
StateStorage->GetState(event->TaskIds, event->GraphId, checkpointId)
- .Apply([checkpointId = event->Checkpoint,
- generation = event->Generation,
+ .Apply([checkpointId = event->Checkpoint,
+ generation = event->Generation,
taskIds = event->TaskIds,
- cookie = ev->Cookie,
- sender = ev->Sender,
- context = TActivationContext::AsActorContext()](const NThreading::TFuture<IStateStorage::TGetStateResult>& resultFuture) {
- auto result = resultFuture.GetValue();
+ cookie = ev->Cookie,
+ sender = ev->Sender,
+ context = TActivationContext::AsActorContext()](const NThreading::TFuture<IStateStorage::TGetStateResult>& resultFuture) {
+ auto result = resultFuture.GetValue();
auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvGetTaskStateResult>(checkpointId, result.second, generation);
std::swap(response->States, result.first);
- if (response->Issues) {
+ if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString());
- }
+ }
LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult");
- context.Send(sender, response.release(), 0, cookie);
- });
+ context.Send(sender, response.release(), 0, cookie);
+ });
}
} // namespace
diff --git a/ydb/core/yq/libs/checkpoint_storage/ya.make b/ydb/core/yq/libs/checkpoint_storage/ya.make
index fc2f449e37..b11c319eaa 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ya.make
+++ b/ydb/core/yq/libs/checkpoint_storage/ya.make
@@ -26,8 +26,8 @@ PEERDIR(
ydb/library/yql/dq/proto
)
-YQL_LAST_ABI_VERSION()
-
+YQL_LAST_ABI_VERSION()
+
END()
RECURSE(
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
index 7799b52daa..034254bb4b 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
@@ -7,7 +7,7 @@
#include <util/stream/str.h>
#include <util/string/builder.h>
-#include <util/string/printf.h>
+#include <util/string/printf.h>
#include <fmt/format.h>
@@ -87,18 +87,18 @@ using TGetCoordinatorsContextPtr = TIntrusivePtr<TGetCoordinatorsContext>;
////////////////////////////////////////////////////////////////////////////////
-struct TAddToStateSizeContext : public TThrRefBase {
- ui64 Size = 0;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TGetTotalCheckpointsStateSizeContext : public TThrRefBase {
- ui64 Size = 0;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
+struct TAddToStateSizeContext : public TThrRefBase {
+ ui64 Size = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TGetTotalCheckpointsStateSizeContext : public TThrRefBase {
+ ui64 Size = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
TFuture<TDataQueryResult> SelectGraphCoordinators(const TGenerationContextPtr& context)
{
// TODO: use prepared queries
@@ -239,14 +239,14 @@ TFuture<TStatus> UpdateCheckpoint(const TCheckpointContextPtr& context) {
PRAGMA TablePathPrefix("%s");
$ts = cast(%lu as Timestamp);
- UPSERT INTO %s (graph_id, coordinator_generation, seq_no, status, modified_by) VALUES
+ UPSERT INTO %s (graph_id, coordinator_generation, seq_no, status, modified_by) VALUES
("%s", %lu, %lu, %u, $ts);
)", generationContext->TablePathPrefix.c_str(),
TInstant::Now().MicroSeconds(),
CheckpointsMetadataTable,
generationContext->PrimaryKey.c_str(),
- context->CheckpointId.CoordinatorGeneration,
- context->CheckpointId.SeqNo,
+ context->CheckpointId.CoordinatorGeneration,
+ context->CheckpointId.SeqNo,
(ui32)context->Status);
auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx();
@@ -325,22 +325,22 @@ TFuture<TStatus> CreateCheckpointWrapper(
TFuture<TDataQueryResult> SelectGraphCheckpoints(const TGenerationContextPtr& context, const TVector<ECheckpointStatus>& statuses, ui64 limit, TExecDataQuerySettings settings, bool loadGraphDescription)
{
- NYdb::TParamsBuilder paramsBuilder;
- if (statuses) {
- auto& statusesParam = paramsBuilder.AddParam("$statuses").BeginList();
- for (const auto& status : statuses) {
- statusesParam.AddListItem().Uint8(static_cast<ui8>(status));
- }
- statusesParam.EndList().Build();
- }
-
- paramsBuilder.AddParam("$graph_id").String(context->PrimaryKey).Build();
+ NYdb::TParamsBuilder paramsBuilder;
+ if (statuses) {
+ auto& statusesParam = paramsBuilder.AddParam("$statuses").BeginList();
+ for (const auto& status : statuses) {
+ statusesParam.AddListItem().Uint8(static_cast<ui8>(status));
+ }
+ statusesParam.EndList().Build();
+ }
+
+ paramsBuilder.AddParam("$graph_id").String(context->PrimaryKey).Build();
if (limit < std::numeric_limits<ui64>::max()) {
paramsBuilder.AddParam("$limit").Uint64(limit).Build();
}
-
- auto params = paramsBuilder.Build();
-
+
+ auto params = paramsBuilder.Build();
+
using namespace fmt::literals;
TString join;
if (loadGraphDescription) {
@@ -355,12 +355,12 @@ TFuture<TDataQueryResult> SelectGraphCheckpoints(const TGenerationContextPtr& co
const TString query = fmt::format(R"sql(
--!syntax_v1
PRAGMA TablePathPrefix("{table_path_prefix}");
- PRAGMA AnsiInForEmptyOrNullableItemsCollections;
+ PRAGMA AnsiInForEmptyOrNullableItemsCollections;
DECLARE $graph_id AS String;
{optional_statuses_declaration}
{optional_limit_declaration}
-
+
SELECT
{graph_description_field}
metadata.coordinator_generation AS coordinator_generation,
@@ -372,7 +372,7 @@ TFuture<TDataQueryResult> SelectGraphCheckpoints(const TGenerationContextPtr& co
{join}
WHERE metadata.graph_id = $graph_id
{statuses_condition}
- ORDER BY coordinator_generation, seq_no DESC
+ ORDER BY coordinator_generation, seq_no DESC
{limit_condition};
)sql",
"table_path_prefix"_a = context->TablePathPrefix,
@@ -387,9 +387,9 @@ TFuture<TDataQueryResult> SelectGraphCheckpoints(const TGenerationContextPtr& co
return context->Session.ExecuteDataQuery(
query,
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- params,
- settings);
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ params,
+ settings);
}
TFuture<TStatus> ProcessCheckpoints(
@@ -406,15 +406,15 @@ TFuture<TStatus> ProcessCheckpoints(
while (parser.TryNextRow()) {
TCheckpointId checkpointId(
- *parser.ColumnParser("coordinator_generation").GetOptionalUint64(),
- *parser.ColumnParser("seq_no").GetOptionalUint64());
+ *parser.ColumnParser("coordinator_generation").GetOptionalUint64(),
+ *parser.ColumnParser("seq_no").GetOptionalUint64());
getContext->Checkpoints.emplace_back(
context->PrimaryKey,
checkpointId,
ECheckpointStatus(*parser.ColumnParser("status").GetOptionalUint8()),
- *parser.ColumnParser("created_by").GetOptionalTimestamp(),
- *parser.ColumnParser("modified_by").GetOptionalTimestamp());
+ *parser.ColumnParser("created_by").GetOptionalTimestamp(),
+ *parser.ColumnParser("modified_by").GetOptionalTimestamp());
if (loadGraphDescription) {
if (const TMaybe<TString> graphDescription = parser.ColumnParser("graph_description").GetOptionalString(); graphDescription && *graphDescription) {
@@ -446,12 +446,12 @@ TFuture<TDataQueryResult> SelectCheckpoint(const TCheckpointContextPtr& context)
SELECT status
FROM %s
- WHERE graph_id = "%s" AND coordinator_generation = %lu AND seq_no = %lu;
+ WHERE graph_id = "%s" AND coordinator_generation = %lu AND seq_no = %lu;
)", generationContext->TablePathPrefix.c_str(),
CheckpointsMetadataTable,
- generationContext->PrimaryKey.c_str(),
- context->CheckpointId.CoordinatorGeneration,
- context->CheckpointId.SeqNo);
+ generationContext->PrimaryKey.c_str(),
+ context->CheckpointId.CoordinatorGeneration,
+ context->CheckpointId.SeqNo);
return generationContext->Session.ExecuteDataQuery(
query,
@@ -549,7 +549,7 @@ TFuture<TStatus> UpdateCheckpointWithCheckWrapper(
class TCheckpointStorage : public ICheckpointStorage {
TYdbConnectionPtr YdbConnection;
- const NConfig::TYdbStorageConfig Config;
+ const NConfig::TYdbStorageConfig Config;
public:
explicit TCheckpointStorage(
@@ -585,14 +585,14 @@ public:
TFuture<TIssues> AbortCheckpoint(
const TCoordinatorId& coordinator,
- const TCheckpointId& checkpointId) override;
+ const TCheckpointId& checkpointId) override;
TFuture<TGetCheckpointsResult> GetCheckpoints(
const TString& graph) override;
- TFuture<TGetCheckpointsResult> GetCheckpoints(
+ TFuture<TGetCheckpointsResult> GetCheckpoints(
const TString& graph, const TVector<ECheckpointStatus>& statuses, ui64 limit, bool loadGraphDescription) override;
-
+
TFuture<TIssues> DeleteGraph(
const TString& graphId) override;
@@ -603,14 +603,14 @@ public:
TFuture<TIssues> DeleteMarkedCheckpoints(
const TString& graphId,
const TCheckpointId& checkpointUpperBound) override;
-
- TFuture<ICheckpointStorage::TAddToStateSizeResult> AddToStateSize(
- const TString& graphId,
- const TCheckpointId& checkpoint,
- ui64 size) override;
-
- TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> GetTotalCheckpointsStateSize(const TString& graphId) override;
- TExecDataQuerySettings DefaultExecDataQuerySettings();
+
+ TFuture<ICheckpointStorage::TAddToStateSizeResult> AddToStateSize(
+ const TString& graphId,
+ const TCheckpointId& checkpoint,
+ ui64 size) override;
+
+ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> GetTotalCheckpointsStateSize(const TString& graphId) override;
+ TExecDataQuerySettings DefaultExecDataQuerySettings();
private:
TFuture<TCreateCheckpointResult> CreateCheckpointImpl(const TCoordinatorId& coordinator, const TCheckpointContextPtr& context);
@@ -626,7 +626,7 @@ TCheckpointStorage::TCheckpointStorage(
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const IEntityIdGenerator::TPtr& entityIdGenerator)
: YdbConnection(NewYdbConnection(config, credentialsProviderFactory))
- , Config(config)
+ , Config(config)
, EntityIdGenerator(entityIdGenerator)
{
}
@@ -685,14 +685,14 @@ TFuture<TIssues> TCheckpointStorage::Init()
// so we set it primary key column to have index
auto checkpointDesc = TTableBuilder()
.AddNullableColumn("graph_id", EPrimitiveType::String)
- .AddNullableColumn("coordinator_generation", EPrimitiveType::Uint64)
- .AddNullableColumn("seq_no", EPrimitiveType::Uint64)
+ .AddNullableColumn("coordinator_generation", EPrimitiveType::Uint64)
+ .AddNullableColumn("seq_no", EPrimitiveType::Uint64)
.AddNullableColumn("status", EPrimitiveType::Uint8)
- .AddNullableColumn("created_by", EPrimitiveType::Timestamp)
- .AddNullableColumn("modified_by", EPrimitiveType::Timestamp)
- .AddNullableColumn("state_size", EPrimitiveType::Uint64)
+ .AddNullableColumn("created_by", EPrimitiveType::Timestamp)
+ .AddNullableColumn("modified_by", EPrimitiveType::Timestamp)
+ .AddNullableColumn("state_size", EPrimitiveType::Uint64)
.AddNullableColumn("graph_description_id", EPrimitiveType::String)
- .SetPrimaryKeyColumns({"graph_id", "coordinator_generation", "seq_no"})
+ .SetPrimaryKeyColumns({"graph_id", "coordinator_generation", "seq_no"})
.Build();
RUN_CREATE_TABLE(CheckpointsMetadataTable, checkpointDesc);
@@ -870,10 +870,10 @@ TFuture<TIssues> TCheckpointStorage::AbortCheckpoint(
return StatusToIssues(future);
}
-TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckpoints(const TString& graph) {
+TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckpoints(const TString& graph) {
return GetCheckpoints(graph, TVector<ECheckpointStatus>(), std::numeric_limits<ui64>::max(), true);
-}
-
+}
+
TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckpoints(
const TString& graph, const TVector<ECheckpointStatus>& statuses, ui64 limit, bool loadGraphDescription)
{
@@ -953,18 +953,18 @@ TFuture<TIssues> TCheckpointStorage::MarkCheckpointsGC(
$ts = cast(%lu as Timestamp);
UPDATE %s
- SET status = %u, modified_by = $ts
+ SET status = %u, modified_by = $ts
WHERE graph_id = "%s" AND
- (coordinator_generation < %lu OR
- (coordinator_generation = %lu AND seq_no < %lu));
+ (coordinator_generation < %lu OR
+ (coordinator_generation = %lu AND seq_no < %lu));
)", prefix.c_str(),
TInstant::Now().MicroSeconds(),
CheckpointsMetadataTable,
(ui32)ECheckpointStatus::GC,
graphId.c_str(),
- checkpointUpperBound.CoordinatorGeneration,
- checkpointUpperBound.CoordinatorGeneration,
- checkpointUpperBound.SeqNo);
+ checkpointUpperBound.CoordinatorGeneration,
+ checkpointUpperBound.CoordinatorGeneration,
+ checkpointUpperBound.SeqNo);
auto future = session.ExecuteDataQuery(
query,
@@ -1056,127 +1056,127 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
return StatusToIssues(future);
}
TFuture<ICheckpointStorage::TAddToStateSizeResult> NYq::TCheckpointStorage::AddToStateSize(const TString& graphId, const TCheckpointId& checkpointId, ui64 size) {
- auto result = MakeIntrusive<TAddToStateSizeContext>();
- auto future = YdbConnection->Client.RetryOperation(
- [prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, size, result, thisPtr = TIntrusivePtr(this)](TSession session) {
- NYdb::TParamsBuilder paramsBuilder;
- paramsBuilder.AddParam("$graph_id").String(graphId).Build();
- paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointId.CoordinatorGeneration).Build();
- paramsBuilder.AddParam("$seq_no").Uint64(checkpointId.SeqNo).Build();
- paramsBuilder.AddParam("$task_state_size").Uint64(size).Build();
- auto params = paramsBuilder.Build();
-
- auto query = Sprintf(R"(
- --!syntax_v1
- PRAGMA TablePathPrefix("%s");
-
- declare $graph_id as string;
- declare $coordinator_generation as Uint64;
- declare $seq_no as Uint64;
- declare $task_state_size as Uint64;
-
- $current_size = SELECT state_size
- FROM %s
- WHERE graph_id = $graph_id
- AND coordinator_generation = $coordinator_generation
- AND seq_no = $seq_no;
-
- UPDATE %s
- SET state_size = $current_size + $task_state_size
- WHERE graph_id = $graph_id
- AND coordinator_generation = $coordinator_generation
- AND seq_no = $seq_no;
-
- SELECT $current_size + $task_state_size;
- )", prefix.c_str(), CheckpointsMetadataTable, CheckpointsMetadataTable);
-
- return session.ExecuteDataQuery(
- query,
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- params,
- thisPtr->DefaultExecDataQuerySettings())
- .Apply(
- [result, graphId, checkpointId](const TFuture<TDataQueryResult>& future) {
- const auto& queryResult = future.GetValue();
- auto status = TStatus(queryResult);
-
- if (!queryResult.IsSuccess()) {
- Cerr << "AddToStateSize: can't update state size [" << graphId << " " << checkpointId << "] " << queryResult.GetIssues().ToString() << Endl;
- return status;
- }
- TResultSetParser parser = queryResult.GetResultSetParser(0);
- if (parser.TryNextRow()) {
- result->Size = parser.ColumnParser(0).GetOptionalUint64().GetOrElse(0);
- } else {
- Cerr << "ERROR: got no rows in AddToStateSize result set" << Endl;
- }
- return status;
- });
- });
-
- return future.Apply(
- [result](const TFuture<TStatus>& status) {
- return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues()));
- });
-}
-
+ auto result = MakeIntrusive<TAddToStateSizeContext>();
+ auto future = YdbConnection->Client.RetryOperation(
+ [prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, size, result, thisPtr = TIntrusivePtr(this)](TSession session) {
+ NYdb::TParamsBuilder paramsBuilder;
+ paramsBuilder.AddParam("$graph_id").String(graphId).Build();
+ paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointId.CoordinatorGeneration).Build();
+ paramsBuilder.AddParam("$seq_no").Uint64(checkpointId.SeqNo).Build();
+ paramsBuilder.AddParam("$task_state_size").Uint64(size).Build();
+ auto params = paramsBuilder.Build();
+
+ auto query = Sprintf(R"(
+ --!syntax_v1
+ PRAGMA TablePathPrefix("%s");
+
+ declare $graph_id as string;
+ declare $coordinator_generation as Uint64;
+ declare $seq_no as Uint64;
+ declare $task_state_size as Uint64;
+
+ $current_size = SELECT state_size
+ FROM %s
+ WHERE graph_id = $graph_id
+ AND coordinator_generation = $coordinator_generation
+ AND seq_no = $seq_no;
+
+ UPDATE %s
+ SET state_size = $current_size + $task_state_size
+ WHERE graph_id = $graph_id
+ AND coordinator_generation = $coordinator_generation
+ AND seq_no = $seq_no;
+
+ SELECT $current_size + $task_state_size;
+ )", prefix.c_str(), CheckpointsMetadataTable, CheckpointsMetadataTable);
+
+ return session.ExecuteDataQuery(
+ query,
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ params,
+ thisPtr->DefaultExecDataQuerySettings())
+ .Apply(
+ [result, graphId, checkpointId](const TFuture<TDataQueryResult>& future) {
+ const auto& queryResult = future.GetValue();
+ auto status = TStatus(queryResult);
+
+ if (!queryResult.IsSuccess()) {
+ Cerr << "AddToStateSize: can't update state size [" << graphId << " " << checkpointId << "] " << queryResult.GetIssues().ToString() << Endl;
+ return status;
+ }
+ TResultSetParser parser = queryResult.GetResultSetParser(0);
+ if (parser.TryNextRow()) {
+ result->Size = parser.ColumnParser(0).GetOptionalUint64().GetOrElse(0);
+ } else {
+ Cerr << "ERROR: got no rows in AddToStateSize result set" << Endl;
+ }
+ return status;
+ });
+ });
+
+ return future.Apply(
+ [result](const TFuture<TStatus>& status) {
+ return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues()));
+ });
+}
+
TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> NYq::TCheckpointStorage::GetTotalCheckpointsStateSize(const TString& graphId) {
- auto result = MakeIntrusive<TGetTotalCheckpointsStateSizeContext>();
- auto future = YdbConnection->Client.RetryOperation(
- [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result](TSession session) {
- NYdb::TParamsBuilder paramsBuilder;
- paramsBuilder.AddParam("$graph_id").String(graphId).Build();
- auto params = paramsBuilder.Build();
-
- auto query = Sprintf(R"(
- --!syntax_v1
- PRAGMA TablePathPrefix("%s");
-
- declare $graph_id as string;
-
- SELECT SUM(state_size)
- FROM %s
- WHERE graph_id = $graph_id
- )", prefix.c_str(), CheckpointsMetadataTable);
-
- return session.ExecuteDataQuery(
- query,
- TTxControl::BeginTx(TTxSettings::OnlineRO()).CommitTx(),
- params,
- thisPtr->DefaultExecDataQuerySettings())
- .Apply(
- [graphId, result](const TFuture<TDataQueryResult>& future) {
- const auto& queryResult = future.GetValue();
- auto status = TStatus(queryResult);
-
- if (!queryResult.IsSuccess()) {
- Cerr << "AddToStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString() << Endl;
- return status;
- }
-
- TResultSetParser parser = queryResult.GetResultSetParser(0);
- if (parser.TryNextRow()) {
- result->Size = parser.ColumnParser(0).GetOptionalUint64().GetOrElse(0);
- } else {
- result->Size = 0;
- }
- return status;
- });
- });
- return future.Apply(
- [result](const TFuture<TStatus>& status) {
- return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues()));
- });
-}
-
+ auto result = MakeIntrusive<TGetTotalCheckpointsStateSizeContext>();
+ auto future = YdbConnection->Client.RetryOperation(
+ [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result](TSession session) {
+ NYdb::TParamsBuilder paramsBuilder;
+ paramsBuilder.AddParam("$graph_id").String(graphId).Build();
+ auto params = paramsBuilder.Build();
+
+ auto query = Sprintf(R"(
+ --!syntax_v1
+ PRAGMA TablePathPrefix("%s");
+
+ declare $graph_id as string;
+
+ SELECT SUM(state_size)
+ FROM %s
+ WHERE graph_id = $graph_id
+ )", prefix.c_str(), CheckpointsMetadataTable);
+
+ return session.ExecuteDataQuery(
+ query,
+ TTxControl::BeginTx(TTxSettings::OnlineRO()).CommitTx(),
+ params,
+ thisPtr->DefaultExecDataQuerySettings())
+ .Apply(
+ [graphId, result](const TFuture<TDataQueryResult>& future) {
+ const auto& queryResult = future.GetValue();
+ auto status = TStatus(queryResult);
+
+ if (!queryResult.IsSuccess()) {
+ Cerr << "AddToStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString() << Endl;
+ return status;
+ }
+
+ TResultSetParser parser = queryResult.GetResultSetParser(0);
+ if (parser.TryNextRow()) {
+ result->Size = parser.ColumnParser(0).GetOptionalUint64().GetOrElse(0);
+ } else {
+ result->Size = 0;
+ }
+ return status;
+ });
+ });
+ return future.Apply(
+ [result](const TFuture<TStatus>& status) {
+ return std::make_pair(std::move(result->Size), std::move(status.GetValue().GetIssues()));
+ });
+}
+
TExecDataQuerySettings NYq::TCheckpointStorage::DefaultExecDataQuerySettings() {
- return TExecDataQuerySettings()
- .KeepInQueryCache(true)
- .ClientTimeout(TDuration::Seconds(Config.GetClientTimeoutSec()))
- .OperationTimeout(TDuration::Seconds(Config.GetOperationTimeoutSec()))
- .CancelAfter(TDuration::Seconds(Config.GetCancelAfterSec()));
-}
-
+ return TExecDataQuerySettings()
+ .KeepInQueryCache(true)
+ .ClientTimeout(TDuration::Seconds(Config.GetClientTimeoutSec()))
+ .OperationTimeout(TDuration::Seconds(Config.GetOperationTimeoutSec()))
+ .CancelAfter(TDuration::Seconds(Config.GetCancelAfterSec()));
+}
+
} // namespace
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp
index ab209ec1e8..6a57254e10 100644
--- a/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp
+++ b/ydb/core/yq/libs/checkpoint_storage/ydb_state_storage.cpp
@@ -147,7 +147,7 @@ TFuture<TStatus> ProcessState(
class TStateStorage : public IStateStorage {
TYdbConnectionPtr YdbConnection;
- const NConfig::TYdbStorageConfig Config;
+ const NConfig::TYdbStorageConfig Config;
public:
explicit TStateStorage(
@@ -178,10 +178,10 @@ public:
TFuture<TIssues> DeleteCheckpoints(
const TString& graphId,
const TCheckpointId& checkpointId) override;
-
- TFuture<TDataQueryResult> SelectState(const TContextPtr& context);
- TFuture<TStatus> UpsertState(const TContextPtr& context);
- TExecDataQuerySettings DefaultExecDataQuerySettings();
+
+ TFuture<TDataQueryResult> SelectState(const TContextPtr& context);
+ TFuture<TStatus> UpsertState(const TContextPtr& context);
+ TExecDataQuerySettings DefaultExecDataQuerySettings();
};
////////////////////////////////////////////////////////////////////////////////
@@ -190,7 +190,7 @@ TStateStorage::TStateStorage(
const NConfig::TYdbStorageConfig& config,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
: YdbConnection(NewYdbConnection(config, credentialsProviderFactory))
- , Config(config)
+ , Config(config)
{
}
@@ -219,12 +219,12 @@ TFuture<TIssues> TStateStorage::Init()
}
auto stateDesc = TTableBuilder()
- .AddNullableColumn("graph_id", EPrimitiveType::String)
+ .AddNullableColumn("graph_id", EPrimitiveType::String)
.AddNullableColumn("task_id", EPrimitiveType::Uint64)
- .AddNullableColumn("coordinator_generation", EPrimitiveType::Uint64)
- .AddNullableColumn("seq_no", EPrimitiveType::Uint64)
+ .AddNullableColumn("coordinator_generation", EPrimitiveType::Uint64)
+ .AddNullableColumn("seq_no", EPrimitiveType::Uint64)
.AddNullableColumn("blob", EPrimitiveType::String)
- .SetPrimaryKeyColumns({"graph_id", "task_id", "coordinator_generation", "seq_no"})
+ .SetPrimaryKeyColumns({"graph_id", "task_id", "coordinator_generation", "seq_no"})
.Build();
auto status = CreateTable(YdbConnection, StatesTable, std::move(stateDesc)).GetValueSync();
@@ -260,7 +260,7 @@ TFuture<TIssues> TStateStorage::SaveState(
state,
session);
- return thisPtr->UpsertState(context);
+ return thisPtr->UpsertState(context);
});
return StatusToIssues(future);
@@ -292,7 +292,7 @@ TFuture<IStateStorage::TGetStateResult> TStateStorage::GetState(
auto future = YdbConnection->Client.RetryOperation(
[context, thisPtr = TIntrusivePtr(this)] (TSession session) {
context->Session = session;
- auto future = thisPtr->SelectState(context);
+ auto future = thisPtr->SelectState(context);
return future.Apply(
[context] (const TFuture<TDataQueryResult>& future) {
return ProcessState(future.GetValue(), context);
@@ -315,33 +315,33 @@ TFuture<IStateStorage::TCountStatesResult> TStateStorage::CountStates(
auto context = MakeIntrusive<TCountStateContext>();
auto future = YdbConnection->Client.RetryOperation(
- [prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, context, thisPtr = TIntrusivePtr(this)] (TSession session) {
-
- // publish nodes
- NYdb::TParamsBuilder paramsBuilder;
- paramsBuilder.AddParam("$graph_id").String(graphId).Build();
- paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointId.CoordinatorGeneration).Build();
- paramsBuilder.AddParam("$seq_no").Uint64(checkpointId.SeqNo).Build();
-
- auto params = paramsBuilder.Build();
+ [prefix = YdbConnection->TablePathPrefix, graphId, checkpointId, context, thisPtr = TIntrusivePtr(this)] (TSession session) {
+
+ // publish nodes
+ NYdb::TParamsBuilder paramsBuilder;
+ paramsBuilder.AddParam("$graph_id").String(graphId).Build();
+ paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointId.CoordinatorGeneration).Build();
+ paramsBuilder.AddParam("$seq_no").Uint64(checkpointId.SeqNo).Build();
+
+ auto params = paramsBuilder.Build();
auto query = Sprintf(R"(
--!syntax_v1
PRAGMA TablePathPrefix("%s");
- declare $graph_id as string;
- declare $coordinator_generation as Uint64;
- declare $seq_no as Uint64;
-
+ declare $graph_id as string;
+ declare $coordinator_generation as Uint64;
+ declare $seq_no as Uint64;
+
SELECT COUNT(*) as cnt
FROM %s
- WHERE graph_id = $graph_id AND coordinator_generation = $coordinator_generation and seq_no = $seq_no;
- )", prefix.c_str(), StatesTable);
+ WHERE graph_id = $graph_id AND coordinator_generation = $coordinator_generation and seq_no = $seq_no;
+ )", prefix.c_str(), StatesTable);
auto future = session.ExecuteDataQuery(
query,
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- params,
- thisPtr->DefaultExecDataQuerySettings());
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ params,
+ thisPtr->DefaultExecDataQuerySettings());
return future.Apply(
[context] (const TFuture<TDataQueryResult>& future) {
@@ -370,30 +370,30 @@ TFuture<IStateStorage::TCountStatesResult> TStateStorage::CountStates(
return countResult;
});
}
-TExecDataQuerySettings TStateStorage::DefaultExecDataQuerySettings() {
- return TExecDataQuerySettings()
- .KeepInQueryCache(true)
- .ClientTimeout(TDuration::Seconds(Config.GetClientTimeoutSec()))
- .OperationTimeout(TDuration::Seconds(Config.GetOperationTimeoutSec()))
- .CancelAfter(TDuration::Seconds(Config.GetCancelAfterSec()));
-}
+TExecDataQuerySettings TStateStorage::DefaultExecDataQuerySettings() {
+ return TExecDataQuerySettings()
+ .KeepInQueryCache(true)
+ .ClientTimeout(TDuration::Seconds(Config.GetClientTimeoutSec()))
+ .OperationTimeout(TDuration::Seconds(Config.GetOperationTimeoutSec()))
+ .CancelAfter(TDuration::Seconds(Config.GetCancelAfterSec()));
+}
TFuture<TIssues> TStateStorage::DeleteGraph(const TString& graphId) {
auto future = YdbConnection->Client.RetryOperation(
- [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this)] (TSession session) {
-
- // publish nodes
- NYdb::TParamsBuilder paramsBuilder;
- paramsBuilder.AddParam("$graph_id").String(graphId).Build();
-
- auto params = paramsBuilder.Build();
-
+ [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this)] (TSession session) {
+
+ // publish nodes
+ NYdb::TParamsBuilder paramsBuilder;
+ paramsBuilder.AddParam("$graph_id").String(graphId).Build();
+
+ auto params = paramsBuilder.Build();
+
auto query = Sprintf(R"(
--!syntax_v1
PRAGMA TablePathPrefix("%s");
- declare $graph_id as string;
-
+ declare $graph_id as string;
+
DELETE
FROM %s
WHERE graph_id = "%s";
@@ -401,9 +401,9 @@ TFuture<TIssues> TStateStorage::DeleteGraph(const TString& graphId) {
auto future = session.ExecuteDataQuery(
query,
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- params,
- thisPtr->DefaultExecDataQuerySettings());
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ params,
+ thisPtr->DefaultExecDataQuerySettings());
return future.Apply(
[] (const TFuture<TDataQueryResult>& future) {
@@ -420,36 +420,36 @@ TFuture<TIssues> TStateStorage::DeleteCheckpoints(
const TCheckpointId& checkpointUpperBound)
{
auto future = YdbConnection->Client.RetryOperation(
- [prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, thisPtr = TIntrusivePtr(this)] (TSession session) {
-
- // publish nodes
- NYdb::TParamsBuilder paramsBuilder;
- paramsBuilder.AddParam("$graph_id").String(graphId).Build();
- paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointUpperBound.CoordinatorGeneration).Build();
- paramsBuilder.AddParam("$seq_no").Uint64(checkpointUpperBound.SeqNo).Build();
-
- auto params = paramsBuilder.Build();
-
+ [prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, thisPtr = TIntrusivePtr(this)] (TSession session) {
+
+ // publish nodes
+ NYdb::TParamsBuilder paramsBuilder;
+ paramsBuilder.AddParam("$graph_id").String(graphId).Build();
+ paramsBuilder.AddParam("$coordinator_generation").Uint64(checkpointUpperBound.CoordinatorGeneration).Build();
+ paramsBuilder.AddParam("$seq_no").Uint64(checkpointUpperBound.SeqNo).Build();
+
+ auto params = paramsBuilder.Build();
+
auto query = Sprintf(R"(
--!syntax_v1
PRAGMA TablePathPrefix("%s");
- declare $graph_id as string;
- declare $coordinator_generation as Uint64;
- declare $seq_no as Uint64;
-
+ declare $graph_id as string;
+ declare $coordinator_generation as Uint64;
+ declare $seq_no as Uint64;
+
DELETE
FROM %s
- WHERE graph_id = $graph_id AND
- (coordinator_generation < $coordinator_generation OR
- (coordinator_generation = $coordinator_generation AND seq_no < $seq_no));
- )", prefix.c_str(), StatesTable);
+ WHERE graph_id = $graph_id AND
+ (coordinator_generation < $coordinator_generation OR
+ (coordinator_generation = $coordinator_generation AND seq_no < $seq_no));
+ )", prefix.c_str(), StatesTable);
auto future = session.ExecuteDataQuery(
query,
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- params,
- thisPtr->DefaultExecDataQuerySettings());
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ params,
+ thisPtr->DefaultExecDataQuerySettings());
return future.Apply(
[] (const TFuture<TDataQueryResult>& future) {
@@ -460,9 +460,9 @@ TFuture<TIssues> TStateStorage::DeleteCheckpoints(
return StatusToIssues(future);
}
-TFuture<TDataQueryResult> TStateStorage::SelectState(const TContextPtr& context)
-{
- NYdb::TParamsBuilder paramsBuilder;
+TFuture<TDataQueryResult> TStateStorage::SelectState(const TContextPtr& context)
+{
+ NYdb::TParamsBuilder paramsBuilder;
Y_VERIFY(!context->TaskIds.empty());
if (context->TaskIds.size() == 1) {
paramsBuilder.AddParam("$task_id").Uint64(context->TaskIds[0]).Build();
@@ -473,83 +473,83 @@ TFuture<TDataQueryResult> TStateStorage::SelectState(const TContextPtr& context)
}
taskIdsParam.EndList().Build();
}
- paramsBuilder.AddParam("$graph_id").String(context->GraphId).Build();
- paramsBuilder.AddParam("$coordinator_generation").Uint64(context->CheckpointId.CoordinatorGeneration).Build();
- paramsBuilder.AddParam("$seq_no").Uint64(context->CheckpointId.SeqNo).Build();
-
- auto params = paramsBuilder.Build();
-
- auto query = Sprintf(R"(
- --!syntax_v1
- PRAGMA TablePathPrefix("%s");
-
+ paramsBuilder.AddParam("$graph_id").String(context->GraphId).Build();
+ paramsBuilder.AddParam("$coordinator_generation").Uint64(context->CheckpointId.CoordinatorGeneration).Build();
+ paramsBuilder.AddParam("$seq_no").Uint64(context->CheckpointId.SeqNo).Build();
+
+ auto params = paramsBuilder.Build();
+
+ auto query = Sprintf(R"(
+ --!syntax_v1
+ PRAGMA TablePathPrefix("%s");
+
%s;
DECLARE $graph_id AS string;
DECLARE $coordinator_generation AS Uint64;
DECLARE $seq_no AS Uint64;
-
+
SELECT task_id, blob
- FROM %s
+ FROM %s
WHERE %s AND graph_id = $graph_id AND coordinator_generation = $coordinator_generation AND seq_no = $seq_no;
)",
context->TablePathPrefix.c_str(),
context->TaskIds.size() == 1 ? "DECLARE $task_id AS Uint64" : "DECLARE $task_ids AS List<Uint64>",
StatesTable,
context->TaskIds.size() == 1 ? "task_id = $task_id" : "task_id IN $task_ids");
-
+
Y_VERIFY(context->Session);
return context->Session->ExecuteDataQuery(
- query,
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- params,
- DefaultExecDataQuerySettings());
-}
-
-TFuture<TStatus> TStateStorage::UpsertState(const TContextPtr& context) {
+ query,
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ params,
+ DefaultExecDataQuerySettings());
+}
+
+TFuture<TStatus> TStateStorage::UpsertState(const TContextPtr& context) {
Y_VERIFY(context->States.size() == 1);
TString serializedState;
if (!context->States[0].SerializeToString(&serializedState)) {
return MakeFuture(MakeErrorStatus(EStatus::BAD_REQUEST, "Failed to serialize compute actor state", NYql::TSeverityIds::S_ERROR));
}
-
- // publish nodes
- NYdb::TParamsBuilder paramsBuilder;
+
+ // publish nodes
+ NYdb::TParamsBuilder paramsBuilder;
Y_VERIFY(context->TaskIds.size() == 1);
paramsBuilder.AddParam("$task_id").Uint64(context->TaskIds[0]).Build();
- paramsBuilder.AddParam("$graph_id").String(context->GraphId).Build();
- paramsBuilder.AddParam("$coordinator_generation").Uint64(context->CheckpointId.CoordinatorGeneration).Build();
- paramsBuilder.AddParam("$seq_no").Uint64(context->CheckpointId.SeqNo).Build();
+ paramsBuilder.AddParam("$graph_id").String(context->GraphId).Build();
+ paramsBuilder.AddParam("$coordinator_generation").Uint64(context->CheckpointId.CoordinatorGeneration).Build();
+ paramsBuilder.AddParam("$seq_no").Uint64(context->CheckpointId.SeqNo).Build();
paramsBuilder.AddParam("$blob").String(serializedState).Build();
-
- auto params = paramsBuilder.Build();
-
- auto query = Sprintf(R"(
- --!syntax_v1
- PRAGMA TablePathPrefix("%s");
-
- declare $task_id as Uint64;
- declare $graph_id as string;
- declare $coordinator_generation as Uint64;
- declare $seq_no as Uint64;
- declare $blob as string;
-
- UPSERT INTO %s (task_id, graph_id, coordinator_generation, seq_no, blob) VALUES
- ($task_id, $graph_id, $coordinator_generation, $seq_no, $blob);
- )", context->TablePathPrefix.c_str(), StatesTable);
-
+
+ auto params = paramsBuilder.Build();
+
+ auto query = Sprintf(R"(
+ --!syntax_v1
+ PRAGMA TablePathPrefix("%s");
+
+ declare $task_id as Uint64;
+ declare $graph_id as string;
+ declare $coordinator_generation as Uint64;
+ declare $seq_no as Uint64;
+ declare $blob as string;
+
+ UPSERT INTO %s (task_id, graph_id, coordinator_generation, seq_no, blob) VALUES
+ ($task_id, $graph_id, $coordinator_generation, $seq_no, $blob);
+ )", context->TablePathPrefix.c_str(), StatesTable);
+
Y_VERIFY(context->Session);
auto future = context->Session->ExecuteDataQuery(
- query,
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- params,
- DefaultExecDataQuerySettings());
- return future.Apply(
- [] (const TFuture<TDataQueryResult>& future) {
- TStatus status = future.GetValue();
- return status;
- });
-}
-
+ query,
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ params,
+ DefaultExecDataQuerySettings());
+ return future.Apply(
+ [] (const TFuture<TDataQueryResult>& future) {
+ TStatus status = future.GetValue();
+ return status;
+ });
+}
+
} // namespace
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
index d9d354c2e5..1c2ab3ea06 100644
--- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
+++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
@@ -1,20 +1,20 @@
#include "utils.h"
-#include "checkpoint_coordinator.h"
-
+#include "checkpoint_coordinator.h"
+
#include <ydb/core/yq/libs/actors/logging/log.h>
-
-#include <library/cpp/actors/core/actor.h>
-#include <library/cpp/actors/core/hfunc.h>
-
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/hfunc.h>
+
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h>
#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/dq/state/dq_state_load_plan.h>
-
+
#include <util/string/builder.h>
-#include <utility>
-
+#include <utility>
+
#define CC_LOG_D(stream) \
LOG_STREAMS_CHECKPOINT_COORDINATOR_DEBUG("[" << CoordinatorId << "] " << stream)
#define CC_LOG_I(stream) \
@@ -25,31 +25,31 @@
LOG_STREAMS_CHECKPOINT_COORDINATOR_ERROR("[" << CoordinatorId << "] " << stream)
namespace NYq {
-
-TCheckpointCoordinator::TCheckpointCoordinator(TCoordinatorId coordinatorId,
- const TActorId& taskControllerId,
+
+TCheckpointCoordinator::TCheckpointCoordinator(TCoordinatorId coordinatorId,
+ const TActorId& taskControllerId,
const TActorId& storageProxy,
const TActorId& runActorId,
const TCheckpointCoordinatorConfig& settings,
- const NMonitoring::TDynamicCounterPtr& counters,
+ const NMonitoring::TDynamicCounterPtr& counters,
const NProto::TGraphParams& graphParams,
const YandexQuery::StateLoadMode& stateLoadMode,
const YandexQuery::StreamingDisposition& streamingDisposition)
- : CoordinatorId(std::move(coordinatorId))
- , TaskControllerId(taskControllerId)
- , StorageProxy(storageProxy)
+ : CoordinatorId(std::move(coordinatorId))
+ , TaskControllerId(taskControllerId)
+ , StorageProxy(storageProxy)
, RunActorId(runActorId)
- , Settings(settings)
- , CheckpointingPeriod(TDuration::MilliSeconds(Settings.GetCheckpointingPeriodMillis()))
+ , Settings(settings)
+ , CheckpointingPeriod(TDuration::MilliSeconds(Settings.GetCheckpointingPeriodMillis()))
, GraphParams(graphParams)
- , Metrics(TCheckpointCoordinatorMetrics(counters))
+ , Metrics(TCheckpointCoordinatorMetrics(counters))
, StateLoadMode(stateLoadMode)
, StreamingDisposition(streamingDisposition)
{
-}
-
-void TCheckpointCoordinator::Bootstrap() {
- Become(&TThis::DispatchEvent);
+}
+
+void TCheckpointCoordinator::Bootstrap() {
+ Become(&TThis::DispatchEvent);
CC_LOG_D("Bootstrapped with streaming disposition " << StreamingDisposition << " and state load mode " << YandexQuery::StateLoadMode_Name(StateLoadMode));
}
@@ -91,37 +91,37 @@ void TCheckpointCoordinator::Handle(const NYql::NDqs::TEvReadyState::TPtr& ev) {
PendingInit = std::make_unique<TPendingInitCoordinator>(AllActors.size());
CC_LOG_D("Send TEvRegisterCoordinatorRequest");
- Send(StorageProxy, new TEvCheckpointStorage::TEvRegisterCoordinatorRequest(CoordinatorId), IEventHandle::FlagTrackDelivery);
-}
-
-void TCheckpointCoordinator::ScheduleNextCheckpoint() {
- Schedule(CheckpointingPeriod, new TEvCheckpointCoordinator::TEvScheduleCheckpointing());
-}
-
-void TCheckpointCoordinator::UpdateInProgressMetric() {
- const auto pending = PendingCheckpoints.size();
- const auto pendingCommit = PendingCommitCheckpoints.size();
- Metrics.Pending->Set(pending);
- Metrics.PendingCommit->Set(pendingCommit);
- Metrics.InProgress->Set(pending + pendingCommit);
-}
-
-void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr& ev) {
+ Send(StorageProxy, new TEvCheckpointStorage::TEvRegisterCoordinatorRequest(CoordinatorId), IEventHandle::FlagTrackDelivery);
+}
+
+void TCheckpointCoordinator::ScheduleNextCheckpoint() {
+ Schedule(CheckpointingPeriod, new TEvCheckpointCoordinator::TEvScheduleCheckpointing());
+}
+
+void TCheckpointCoordinator::UpdateInProgressMetric() {
+ const auto pending = PendingCheckpoints.size();
+ const auto pendingCommit = PendingCommitCheckpoints.size();
+ Metrics.Pending->Set(pending);
+ Metrics.PendingCommit->Set(pendingCommit);
+ Metrics.InProgress->Set(pending + pendingCommit);
+}
+
+void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr& ev) {
CC_LOG_D("Got TEvRegisterCoordinatorResponse; issues: " << ev->Get()->Issues.ToOneLineString());
- const auto& issues = ev->Get()->Issues;
- if (issues) {
+ const auto& issues = ev->Get()->Issues;
+ if (issues) {
auto message = "Can't register in storage: " + issues.ToOneLineString();
CC_LOG_E(message);
- ++*Metrics.StorageError;
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message));
- return;
- }
-
+ ++*Metrics.StorageError;
+ Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message));
+ return;
+ }
+
CC_LOG_D("Successfully registered in storage");
CC_LOG_I("Send TEvNewCheckpointCoordinator to " << AllActors.size() << " actor(s)");
for (const auto& [actor, transport] : AllActors) {
transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinator(CoordinatorId.Generation, CoordinatorId.GraphId));
- }
+ }
const bool needCheckpointMetadata = StateLoadMode == YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT || StreamingDisposition.has_from_last_checkpoint();
if (needCheckpointMetadata) {
@@ -142,9 +142,9 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoord
ScheduleNextCheckpoint();
} else {
Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(TStringBuilder() << "Unexpected state load mode (" << YandexQuery::StateLoadMode_Name(StateLoadMode) << ") and streaming disposition " << StreamingDisposition));
- }
-}
-
+ }
+}
+
void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck::TPtr& ev) {
if (!OnComputeActorEventReceived(ev)) {
return;
@@ -159,42 +159,42 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvNewCheckpo
}
}
-void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr& ev) {
- const auto event = ev->Get();
- const auto& checkpoints = event->Checkpoints;
+void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr& ev) {
+ const auto event = ev->Get();
+ const auto& checkpoints = event->Checkpoints;
CC_LOG_D("Got TEvGetCheckpointsMetadataResponse");
- Y_VERIFY(!PendingRestoreCheckpoint);
-
- if (event->Issues) {
- ++*Metrics.StorageError;
+ Y_VERIFY(!PendingRestoreCheckpoint);
+
+ if (event->Issues) {
+ ++*Metrics.StorageError;
auto message = "Can't get checkpoints to restore: " + event->Issues.ToOneLineString();
CC_LOG_E(message);
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message));
- return;
- }
-
- Y_VERIFY(checkpoints.size() < 2);
- if (!checkpoints.empty()) {
- const auto& checkpoint = checkpoints.at(0);
- CheckpointIdGenerator = std::make_unique<TCheckpointIdGenerator>(CoordinatorId, checkpoint.CheckpointId);
+ Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message));
+ return;
+ }
+
+ Y_VERIFY(checkpoints.size() < 2);
+ if (!checkpoints.empty()) {
+ const auto& checkpoint = checkpoints.at(0);
+ CheckpointIdGenerator = std::make_unique<TCheckpointIdGenerator>(CoordinatorId, checkpoint.CheckpointId);
const bool needRestoreOffsets = StateLoadMode == YandexQuery::StateLoadMode::EMPTY && StreamingDisposition.has_from_last_checkpoint();
if (needRestoreOffsets) {
TryToRestoreOffsetsFromForeignCheckpoint(checkpoint);
} else {
RestoreFromOwnCheckpoint(checkpoint);
- }
- return;
- }
-
+ }
+ return;
+ }
+
// Not restored from existing checkpoint. Init zero checkpoint
++*Metrics.StartedFromEmptyCheckpoint;
CheckpointIdGenerator = std::make_unique<TCheckpointIdGenerator>(CoordinatorId);
CC_LOG_I("Found no checkpoints to restore from, creating a 'zero' checkpoint");
InitingZeroCheckpoint = true;
- InitCheckpoint();
- ScheduleNextCheckpoint();
-}
-
+ InitCheckpoint();
+ ScheduleNextCheckpoint();
+}
+
void TCheckpointCoordinator::RestoreFromOwnCheckpoint(const TCheckpointMetadata& checkpoint) {
CC_LOG_I("Will restore from checkpoint " << checkpoint.CheckpointId);
PendingRestoreCheckpoint = TPendingRestoreCheckpoint(checkpoint.CheckpointId, checkpoint.Status == ECheckpointStatus::PendingCommit, ActorsToWaitForSet);
@@ -256,77 +256,77 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe
}
}
-void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult::TPtr& ev) {
+void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult::TPtr& ev) {
if (!OnComputeActorEventReceived(ev)) {
return;
}
- const auto& record = ev->Get()->Record;
- const auto& checkpointProto = record.GetCheckpoint();
- const TCheckpointId checkpoint(checkpointProto.GetGeneration(), checkpointProto.GetId());
- const auto& status = record.GetStatus();
- const TString& statusName = NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_Name(status);
- CC_LOG_D("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult; taskId: "<< record.GetTaskId()
- << ", checkpoint: " << checkpoint
+ const auto& record = ev->Get()->Record;
+ const auto& checkpointProto = record.GetCheckpoint();
+ const TCheckpointId checkpoint(checkpointProto.GetGeneration(), checkpointProto.GetId());
+ const auto& status = record.GetStatus();
+ const TString& statusName = NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_Name(status);
+ CC_LOG_D("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult; taskId: "<< record.GetTaskId()
+ << ", checkpoint: " << checkpoint
<< ", status: " << statusName);
-
- if (!PendingRestoreCheckpoint) {
+
+ if (!PendingRestoreCheckpoint) {
CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint");
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint"));
- return;
- }
-
- if (PendingRestoreCheckpoint->CheckpointId != checkpoint) {
+ Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint"));
+ return;
+ }
+
+ if (PendingRestoreCheckpoint->CheckpointId != checkpoint) {
CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult event with unexpected checkpoint: " << checkpoint << ", expected: " << PendingRestoreCheckpoint->CheckpointId);
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got unexpected checkpoint"));
- return;
- }
-
- if (status != NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_OK) {
+ Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got unexpected checkpoint"));
+ return;
+ }
+
+ if (status != NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_OK) {
CC_LOG_E("[" << checkpoint << "] Can't restore: " << statusName);
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Aborted("Can't restore: " + statusName));
- return;
- }
-
- PendingRestoreCheckpoint->Acknowledge(ev->Sender);
+ Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Aborted("Can't restore: " + statusName));
+ return;
+ }
+
+ PendingRestoreCheckpoint->Acknowledge(ev->Sender);
CC_LOG_D("[" << checkpoint << "] Task state restored, need " << PendingRestoreCheckpoint->NotYetAcknowledgedCount() << " more acks");
-
- if (PendingRestoreCheckpoint->GotAllAcknowledges()) {
+
+ if (PendingRestoreCheckpoint->GotAllAcknowledges()) {
if (PendingInit) {
PendingInit = nullptr;
}
- if (PendingRestoreCheckpoint->CommitAfterRestore) {
+ if (PendingRestoreCheckpoint->CommitAfterRestore) {
CC_LOG_I("[" << checkpoint << "] State restored, send TEvCommitState to " << ActorsToNotify.size() << " actor(s)");
PendingCommitCheckpoints.emplace(checkpoint, TPendingCheckpoint(ActorsToNotifySet));
- UpdateInProgressMetric();
+ UpdateInProgressMetric();
for (const auto& [actor, transport] : ActorsToNotify) {
transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvCommitState(checkpoint.SeqNo, checkpoint.CoordinatorGeneration, CoordinatorId.Generation));
- }
- }
-
+ }
+ }
+
if (RestoringFromForeignCheckpoint) {
InitingZeroCheckpoint = true;
InitCheckpoint();
}
- ScheduleNextCheckpoint();
+ ScheduleNextCheckpoint();
CC_LOG_I("[" << checkpoint << "] State restored, send TEvRun to " << AllActors.size() << " actors");
for (const auto& [actor, transport] : AllActors) {
transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvRun());
- }
- }
-}
-
-void TCheckpointCoordinator::InitCheckpoint() {
- Y_VERIFY(CheckpointIdGenerator);
- const auto nextCheckpointId = CheckpointIdGenerator->NextId();
+ }
+ }
+}
+
+void TCheckpointCoordinator::InitCheckpoint() {
+ Y_VERIFY(CheckpointIdGenerator);
+ const auto nextCheckpointId = CheckpointIdGenerator->NextId();
CC_LOG_I("[" << nextCheckpointId << "] Registering new checkpoint in storage");
-
+
PendingCheckpoints.emplace(nextCheckpointId, TPendingCheckpoint(ActorsToWaitForSet));
- UpdateInProgressMetric();
- ++*Metrics.Total;
-
+ UpdateInProgressMetric();
+ ++*Metrics.Total;
+
std::unique_ptr<TEvCheckpointStorage::TEvCreateCheckpointRequest> req;
if (GraphDescId) {
req = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointRequest>(CoordinatorId, nextCheckpointId, ActorsToWaitForSet.size(), GraphDescId);
@@ -337,35 +337,35 @@ void TCheckpointCoordinator::InitCheckpoint() {
}
Send(StorageProxy, req.release(), IEventHandle::FlagTrackDelivery);
-}
-
-void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvScheduleCheckpointing::TPtr&) {
+}
+
+void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvScheduleCheckpointing::TPtr&) {
CC_LOG_D("Got TEvScheduleCheckpointing");
- ScheduleNextCheckpoint();
- const auto checkpointsInFly = PendingCheckpoints.size() + PendingCommitCheckpoints.size();
+ ScheduleNextCheckpoint();
+ const auto checkpointsInFly = PendingCheckpoints.size() + PendingCommitCheckpoints.size();
if (checkpointsInFly >= Settings.GetMaxInflight() || InitingZeroCheckpoint) {
CC_LOG_W("Skip schedule checkpoint event since inflight checkpoint limit exceeded: current: " << checkpointsInFly << ", limit: " << Settings.GetMaxInflight());
- Metrics.SkippedDueToInFlightLimit->Inc();
- return;
- }
- Metrics.SkippedDueToInFlightLimit->Set(0);
- InitCheckpoint();
-}
-
-void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpointResponse::TPtr& ev) {
- const auto& checkpointId = ev->Get()->CheckpointId;
- const auto& issues = ev->Get()->Issues;
+ Metrics.SkippedDueToInFlightLimit->Inc();
+ return;
+ }
+ Metrics.SkippedDueToInFlightLimit->Set(0);
+ InitCheckpoint();
+}
+
+void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpointResponse::TPtr& ev) {
+ const auto& checkpointId = ev->Get()->CheckpointId;
+ const auto& issues = ev->Get()->Issues;
CC_LOG_D("[" << checkpointId << "] Got TEvCreateCheckpointResponse");
-
- if (issues) {
+
+ if (issues) {
CC_LOG_E("[" << checkpointId << "] Can't create checkpoint: " << issues.ToOneLineString());
- PendingCheckpoints.erase(checkpointId);
- UpdateInProgressMetric();
- ++*Metrics.FailedToCreate;
- ++*Metrics.StorageError;
- return;
- }
-
+ PendingCheckpoints.erase(checkpointId);
+ UpdateInProgressMetric();
+ ++*Metrics.FailedToCreate;
+ ++*Metrics.StorageError;
+ return;
+ }
+
if (GraphDescId) {
Y_VERIFY(GraphDescId == ev->Get()->GraphDescId);
} else {
@@ -381,156 +381,156 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpo
}
} else {
InjectCheckpoint(checkpointId);
- }
+ }
}
-
+
void TCheckpointCoordinator::InjectCheckpoint(const TCheckpointId& checkpointId) {
CC_LOG_I("[" << checkpointId << "] Checkpoint successfully created, going to inject barriers to " << ActorsToTrigger.size() << " actor(s)");
for (const auto& [toTrigger, transport] : ActorsToTrigger) {
transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvInjectCheckpoint(checkpointId.SeqNo, checkpointId.CoordinatorGeneration));
}
- if (!GraphIsRunning) {
+ if (!GraphIsRunning) {
CC_LOG_I("[" << checkpointId << "] Send TEvRun to all actors");
for (const auto& [actor, transport] : AllActors) {
transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvRun());
- }
- GraphIsRunning = true;
- }
-}
-
-void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult::TPtr& ev) {
- const auto& proto = ev->Get()->Record;
- const auto& checkpointProto = proto.GetCheckpoint();
- const auto& status = proto.GetStatus();
- const TString& statusName = NYql::NDqProto::TEvSaveTaskStateResult_EStatus_Name(status);
-
+ }
+ GraphIsRunning = true;
+ }
+}
+
+void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult::TPtr& ev) {
+ const auto& proto = ev->Get()->Record;
+ const auto& checkpointProto = proto.GetCheckpoint();
+ const auto& status = proto.GetStatus();
+ const TString& statusName = NYql::NDqProto::TEvSaveTaskStateResult_EStatus_Name(status);
+
if (!OnComputeActorEventReceived(ev)) {
return;
}
- TCheckpointId checkpointId(checkpointProto.GetGeneration(), checkpointProto.GetId());
-
- CC_LOG_D("[" << checkpointId << "] Got TEvSaveTaskStateResult; task " << proto.GetTaskId()
+ TCheckpointId checkpointId(checkpointProto.GetGeneration(), checkpointProto.GetId());
+
+ CC_LOG_D("[" << checkpointId << "] Got TEvSaveTaskStateResult; task " << proto.GetTaskId()
<< ", status: " << statusName << ", size: " << proto.GetStateSizeBytes());
-
- const auto it = PendingCheckpoints.find(checkpointId);
- if (it == PendingCheckpoints.end()) {
- return;
- }
- auto& checkpoint = it->second;
-
- if (status == NYql::NDqProto::TEvSaveTaskStateResult::OK) {
- checkpoint.Acknowledge(ev->Sender, proto.GetStateSizeBytes());
+
+ const auto it = PendingCheckpoints.find(checkpointId);
+ if (it == PendingCheckpoints.end()) {
+ return;
+ }
+ auto& checkpoint = it->second;
+
+ if (status == NYql::NDqProto::TEvSaveTaskStateResult::OK) {
+ checkpoint.Acknowledge(ev->Sender, proto.GetStateSizeBytes());
CC_LOG_D("[" << checkpointId << "] Task state saved, need " << checkpoint.NotYetAcknowledgedCount() << " more acks");
- if (checkpoint.GotAllAcknowledges()) {
+ if (checkpoint.GotAllAcknowledges()) {
CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'");
- Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId), IEventHandle::FlagTrackDelivery);
+ Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId), IEventHandle::FlagTrackDelivery);
if (InitingZeroCheckpoint) {
Send(RunActorId, new TEvCheckpointCoordinator::TEvZeroCheckpointDone());
}
- }
- } else {
+ }
+ } else {
CC_LOG_E("[" << checkpointId << "] Can't save node state, aborting checkpoint");
- Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
- }
-}
-
-void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse::TPtr& ev) {
- const auto& checkpointId = ev->Get()->CheckpointId;
- const auto issues = ev->Get()->Issues;
+ Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
+ }
+}
+
+void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse::TPtr& ev) {
+ const auto& checkpointId = ev->Get()->CheckpointId;
+ const auto issues = ev->Get()->Issues;
CC_LOG_D("[" << checkpointId << "] Got TEvSetCheckpointPendingCommitStatusResponse");
- const auto it = PendingCheckpoints.find(checkpointId);
- if (it == PendingCheckpoints.end()) {
+ const auto it = PendingCheckpoints.find(checkpointId);
+ if (it == PendingCheckpoints.end()) {
CC_LOG_W("[" << checkpointId << "] Got TEvSetCheckpointPendingCommitStatusResponse for checkpoint but it is not in PendingCheckpoints");
- return;
- }
-
- if (issues) {
+ return;
+ }
+
+ if (issues) {
CC_LOG_E("[" << checkpointId << "] Can't change checkpoint status to 'PendingCommit': " << issues.ToString());
- ++*Metrics.StorageError;
- PendingCheckpoints.erase(it);
- return;
- }
-
+ ++*Metrics.StorageError;
+ PendingCheckpoints.erase(it);
+ return;
+ }
+
CC_LOG_I("[" << checkpointId << "] Checkpoint status changed to 'PendingCommit', committing states");
PendingCommitCheckpoints.emplace(checkpointId, TPendingCheckpoint(ActorsToNotifySet, it->second.GetStats()));
- PendingCheckpoints.erase(it);
- UpdateInProgressMetric();
+ PendingCheckpoints.erase(it);
+ UpdateInProgressMetric();
for (const auto& [toTrigger, transport] : ActorsToNotify) {
transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvCommitState(checkpointId.SeqNo, checkpointId.CoordinatorGeneration, CoordinatorId.Generation));
- }
-}
-
-void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvStateCommitted::TPtr& ev) {
+ }
+}
+
+void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvStateCommitted::TPtr& ev) {
if (!OnComputeActorEventReceived(ev)) {
return;
}
- const auto& checkpointPb = ev->Get()->Record.GetCheckpoint();
- TCheckpointId checkpointId(checkpointPb.GetGeneration(), checkpointPb.GetId());
+ const auto& checkpointPb = ev->Get()->Record.GetCheckpoint();
+ TCheckpointId checkpointId(checkpointPb.GetGeneration(), checkpointPb.GetId());
CC_LOG_D("[" << checkpointId << "] Got TEvStateCommitted; task: " << ev->Get()->Record.GetTaskId());
- const auto it = PendingCommitCheckpoints.find(checkpointId);
- if (it == PendingCommitCheckpoints.end()) {
+ const auto it = PendingCommitCheckpoints.find(checkpointId);
+ if (it == PendingCommitCheckpoints.end()) {
CC_LOG_W("[" << checkpointId << "] Got TEvStateCommitted for checkpoint " << checkpointId << " but it is not in PendingCommitCheckpoints");
- return;
- }
-
- auto& checkpoint = it->second;
- checkpoint.Acknowledge(ev->Sender);
+ return;
+ }
+
+ auto& checkpoint = it->second;
+ checkpoint.Acknowledge(ev->Sender);
CC_LOG_D("[" << checkpointId << "] State committed " << ev->Sender.ToString() << ", need " << checkpoint.NotYetAcknowledgedCount() << " more acks");
- if (checkpoint.GotAllAcknowledges()) {
+ if (checkpoint.GotAllAcknowledges()) {
CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'Completed'");
- const auto& stats = checkpoint.GetStats();
- auto durationMs = (TInstant::Now() - stats.CreatedAt).MilliSeconds();
- Metrics.LastCheckpointBarrierDeliveryTimeMillis->Set(durationMs);
- Metrics.CheckpointBarrierDeliveryTimeMillis->Collect(durationMs);
- Send(StorageProxy, new TEvCheckpointStorage::TEvCompleteCheckpointRequest(CoordinatorId, checkpointId), IEventHandle::FlagTrackDelivery);
- }
-}
-
-void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCompleteCheckpointResponse::TPtr& ev) {
- const auto& checkpointId = ev->Get()->CheckpointId;
+ const auto& stats = checkpoint.GetStats();
+ auto durationMs = (TInstant::Now() - stats.CreatedAt).MilliSeconds();
+ Metrics.LastCheckpointBarrierDeliveryTimeMillis->Set(durationMs);
+ Metrics.CheckpointBarrierDeliveryTimeMillis->Collect(durationMs);
+ Send(StorageProxy, new TEvCheckpointStorage::TEvCompleteCheckpointRequest(CoordinatorId, checkpointId), IEventHandle::FlagTrackDelivery);
+ }
+}
+
+void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCompleteCheckpointResponse::TPtr& ev) {
+ const auto& checkpointId = ev->Get()->CheckpointId;
CC_LOG_D("[" << checkpointId << "] Got TEvCompleteCheckpointResponse");
- const auto it = PendingCommitCheckpoints.find(checkpointId);
- if (it == PendingCommitCheckpoints.end()) {
+ const auto it = PendingCommitCheckpoints.find(checkpointId);
+ if (it == PendingCommitCheckpoints.end()) {
CC_LOG_W("[" << checkpointId << "] Got TEvCompleteCheckpointResponse but related checkpoint is not in progress; checkpointId: " << checkpointId);
- return;
- }
- const auto& issues = ev->Get()->Issues;
- if (!issues) {
- const auto& stats = it->second.GetStats();
- auto durationMs = (TInstant::Now() - stats.CreatedAt).MilliSeconds();
- Metrics.LastCheckpointDurationMillis->Set(durationMs);
- Metrics.LastCheckpointSizeBytes->Set(stats.StateSize);
- Metrics.CheckpointDurationMillis->Collect(durationMs);
- Metrics.CheckpointSizeBytes->Collect(stats.StateSize);
- ++*Metrics.Completed;
+ return;
+ }
+ const auto& issues = ev->Get()->Issues;
+ if (!issues) {
+ const auto& stats = it->second.GetStats();
+ auto durationMs = (TInstant::Now() - stats.CreatedAt).MilliSeconds();
+ Metrics.LastCheckpointDurationMillis->Set(durationMs);
+ Metrics.LastCheckpointSizeBytes->Set(stats.StateSize);
+ Metrics.CheckpointDurationMillis->Collect(durationMs);
+ Metrics.CheckpointSizeBytes->Collect(stats.StateSize);
+ ++*Metrics.Completed;
CC_LOG_I("[" << checkpointId << "] Checkpoint completed");
- } else {
- ++*Metrics.StorageError;
+ } else {
+ ++*Metrics.StorageError;
CC_LOG_E("[" << checkpointId << "] Can't change checkpoint status to 'Completed': " << issues.ToString());
- }
- PendingCommitCheckpoints.erase(it);
- UpdateInProgressMetric();
-}
-
-void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr& ev) {
- const auto& checkpointId = ev->Get()->CheckpointId;
+ }
+ PendingCommitCheckpoints.erase(it);
+ UpdateInProgressMetric();
+}
+
+void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr& ev) {
+ const auto& checkpointId = ev->Get()->CheckpointId;
CC_LOG_D("[" << checkpointId << "] Got TEvAbortCheckpointResponse");
- const auto& issues = ev->Get()->Issues;
- if (issues) {
+ const auto& issues = ev->Get()->Issues;
+ if (issues) {
CC_LOG_E("[" << checkpointId << "] Can't abort checkpoint: " << issues.ToString());
- ++*Metrics.StorageError;
- } else {
+ ++*Metrics.StorageError;
+ } else {
CC_LOG_W("[" << checkpointId << "] Checkpoint aborted");
- ++*Metrics.Aborted;
- }
- PendingCheckpoints.erase(checkpointId);
- PendingCommitCheckpoints.erase(checkpointId);
- UpdateInProgressMetric();
-}
-
+ ++*Metrics.Aborted;
+ }
+ PendingCheckpoints.erase(checkpointId);
+ PendingCommitCheckpoints.erase(checkpointId);
+ UpdateInProgressMetric();
+}
+
void TCheckpointCoordinator::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
const auto actorIt = TaskIdToActor.find(ev->Get()->EventQueueId);
Y_VERIFY(actorIt != TaskIdToActor.end());
@@ -561,14 +561,14 @@ void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
PassAway();
}
-void TCheckpointCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
- TStringStream message;
- message << "Got TEvUndelivered; reason: " << ev->Get()->Reason << ", sourceType: " << ev->Get()->SourceType;
+void TCheckpointCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
+ TStringStream message;
+ message << "Got TEvUndelivered; reason: " << ev->Get()->Reason << ", sourceType: " << ev->Get()->SourceType;
CC_LOG_D(message.Str());
Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Unavailable(message.Str()));
- PassAway();
-}
-
+ PassAway();
+}
+
void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&) {
InitingZeroCheckpoint = false;
// TODO: run graph only now, not before zero checkpoint inited
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h
index 0bb76685e1..5c820f903d 100644
--- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h
+++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h
@@ -1,92 +1,92 @@
-#pragma once
-
-#include "checkpoint_id_generator.h"
-#include "pending_checkpoint.h"
-
+#pragma once
+
+#include "checkpoint_id_generator.h"
+#include "pending_checkpoint.h"
+
#include <ydb/core/yq/libs/checkpointing/events/events.h>
#include <ydb/core/yq/libs/checkpointing_common/defs.h>
#include <ydb/core/yq/libs/checkpoint_storage/events/events.h>
-
+
#include <ydb/core/yq/libs/config/protos/checkpoint_coordinator.pb.h>
#include <ydb/public/api/protos/yq.pb.h>
-
+
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/dq/actors/compute/retry_queue.h>
#include <ydb/library/yql/providers/dq/actors/events.h>
-
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
namespace NYq {
-
-using namespace NActors;
+
+using namespace NActors;
using namespace NYq::NConfig;
-
-class TCheckpointCoordinator : public TActorBootstrapped<TCheckpointCoordinator> {
-public:
- TCheckpointCoordinator(TCoordinatorId coordinatorId,
- const TActorId& taskControllerId,
+
+class TCheckpointCoordinator : public TActorBootstrapped<TCheckpointCoordinator> {
+public:
+ TCheckpointCoordinator(TCoordinatorId coordinatorId,
+ const TActorId& taskControllerId,
const TActorId& storageProxy,
const TActorId& runActorId,
const TCheckpointCoordinatorConfig& settings,
- const NMonitoring::TDynamicCounterPtr& counters,
+ const NMonitoring::TDynamicCounterPtr& counters,
const NProto::TGraphParams& graphParams,
const YandexQuery::StateLoadMode& stateLoadMode,
const YandexQuery::StreamingDisposition& streamingDisposition);
-
+
void Handle(const NYql::NDqs::TEvReadyState::TPtr&);
- void Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr&);
+ void Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr&);
void Handle(const NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck::TPtr&);
- void Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr&);
- void Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult::TPtr&);
- void Handle(const TEvCheckpointCoordinator::TEvScheduleCheckpointing::TPtr&);
- void Handle(const TEvCheckpointStorage::TEvCreateCheckpointResponse::TPtr&);
- void Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult::TPtr&);
- void Handle(const TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse::TPtr&);
- void Handle(const NYql::NDq::TEvDqCompute::TEvStateCommitted::TPtr&);
- void Handle(const TEvCheckpointStorage::TEvCompleteCheckpointResponse::TPtr&);
- void Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr&);
+ void Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr&);
+ void Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult::TPtr&);
+ void Handle(const TEvCheckpointCoordinator::TEvScheduleCheckpointing::TPtr&);
+ void Handle(const TEvCheckpointStorage::TEvCreateCheckpointResponse::TPtr&);
+ void Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult::TPtr&);
+ void Handle(const TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse::TPtr&);
+ void Handle(const NYql::NDq::TEvDqCompute::TEvStateCommitted::TPtr&);
+ void Handle(const TEvCheckpointStorage::TEvCompleteCheckpointResponse::TPtr&);
+ void Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr&);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev);
void Handle(NActors::TEvents::TEvPoison::TPtr&);
- void Handle(NActors::TEvents::TEvUndelivered::TPtr&);
+ void Handle(NActors::TEvents::TEvUndelivered::TPtr&);
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&);
-
-
- STRICT_STFUNC(DispatchEvent,
+
+
+ STRICT_STFUNC(DispatchEvent,
hFunc(NYql::NDqs::TEvReadyState, Handle)
- hFunc(TEvCheckpointStorage::TEvRegisterCoordinatorResponse, Handle)
+ hFunc(TEvCheckpointStorage::TEvRegisterCoordinatorResponse, Handle)
hFunc(NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck, Handle)
- hFunc(TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse, Handle)
- hFunc(NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult, Handle)
- hFunc(TEvCheckpointCoordinator::TEvScheduleCheckpointing, Handle)
- hFunc(TEvCheckpointStorage::TEvCreateCheckpointResponse, Handle)
- hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult, Handle)
- hFunc(TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse, Handle)
- hFunc(NYql::NDq::TEvDqCompute::TEvStateCommitted, Handle)
- hFunc(TEvCheckpointStorage::TEvCompleteCheckpointResponse, Handle)
- hFunc(TEvCheckpointStorage::TEvAbortCheckpointResponse, Handle)
+ hFunc(TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse, Handle)
+ hFunc(NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult, Handle)
+ hFunc(TEvCheckpointCoordinator::TEvScheduleCheckpointing, Handle)
+ hFunc(TEvCheckpointStorage::TEvCreateCheckpointResponse, Handle)
+ hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult, Handle)
+ hFunc(TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse, Handle)
+ hFunc(NYql::NDq::TEvDqCompute::TEvStateCommitted, Handle)
+ hFunc(TEvCheckpointStorage::TEvCompleteCheckpointResponse, Handle)
+ hFunc(TEvCheckpointStorage::TEvAbortCheckpointResponse, Handle)
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle)
- hFunc(NActors::TEvents::TEvPoison, Handle)
- hFunc(NActors::TEvents::TEvUndelivered, Handle)
+ hFunc(NActors::TEvents::TEvPoison, Handle)
+ hFunc(NActors::TEvents::TEvUndelivered, Handle)
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle)
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle)
hFunc(TEvCheckpointCoordinator::TEvRunGraph, Handle)
- )
-
- void Bootstrap();
-
+ )
+
+ void Bootstrap();
+
static constexpr char ActorName[] = "YQ_CHECKPOINT_COORDINATOR";
-private:
- void InitCheckpoint();
+private:
+ void InitCheckpoint();
void InjectCheckpoint(const TCheckpointId& checkpointId);
- void ScheduleNextCheckpoint();
- void UpdateInProgressMetric();
+ void ScheduleNextCheckpoint();
+ void UpdateInProgressMetric();
void PassAway() override;
void RestoreFromOwnCheckpoint(const TCheckpointMetadata& checkpoint);
void TryToRestoreOffsetsFromForeignCheckpoint(const TCheckpointMetadata& checkpoint);
-
+
template <class TEvPtr>
bool OnComputeActorEventReceived(TEvPtr& ev) {
const auto actorIt = AllActors.find(ev->Sender);
@@ -94,65 +94,65 @@ private:
return actorIt->second->EventsQueue.OnEventReceived(ev);
}
- struct TCheckpointCoordinatorMetrics {
- TCheckpointCoordinatorMetrics(const NMonitoring::TDynamicCounterPtr& counters) {
- auto subgroup = counters->GetSubgroup("subsystem", "checkpoint_coordinator");
- InProgress = subgroup->GetCounter("InProgress");
- Pending = subgroup->GetCounter("Pending");
- PendingCommit = subgroup->GetCounter("PendingCommit");
- Completed = subgroup->GetCounter("CompletedCheckpoints", true);
- Aborted = subgroup->GetCounter("AbortedCheckpoints", true);
- StorageError = subgroup->GetCounter("StorageError", true);
- FailedToCreate = subgroup->GetCounter("FailedToCreate", true);
- Total = subgroup->GetCounter("TotalCheckpoints", true);
- LastCheckpointBarrierDeliveryTimeMillis = subgroup->GetCounter("LastCheckpointBarrierDeliveryTimeMillis");
- LastCheckpointDurationMillis = subgroup->GetCounter("LastSuccessfulCheckpointDurationMillis");
- LastCheckpointSizeBytes = subgroup->GetCounter("LastSuccessfulCheckpointSizeBytes");
- CheckpointBarrierDeliveryTimeMillis = subgroup->GetHistogram("CheckpointBarrierDeliveryTimeMillis", NMonitoring::ExponentialHistogram(12, 2, 1024)); // ~ 1s -> ~ 1 h
- CheckpointDurationMillis = subgroup->GetHistogram("CheckpointDurationMillis", NMonitoring::ExponentialHistogram(12, 2, 1024)); // ~ 1s -> ~ 1 h
- CheckpointSizeBytes = subgroup->GetHistogram("CheckpointSizeBytes", NMonitoring::ExponentialHistogram(8, 32, 32)); // 32b -> 1Tb
- SkippedDueToInFlightLimit = subgroup->GetCounter("SkippedDueToInFlightLimit");
+ struct TCheckpointCoordinatorMetrics {
+ TCheckpointCoordinatorMetrics(const NMonitoring::TDynamicCounterPtr& counters) {
+ auto subgroup = counters->GetSubgroup("subsystem", "checkpoint_coordinator");
+ InProgress = subgroup->GetCounter("InProgress");
+ Pending = subgroup->GetCounter("Pending");
+ PendingCommit = subgroup->GetCounter("PendingCommit");
+ Completed = subgroup->GetCounter("CompletedCheckpoints", true);
+ Aborted = subgroup->GetCounter("AbortedCheckpoints", true);
+ StorageError = subgroup->GetCounter("StorageError", true);
+ FailedToCreate = subgroup->GetCounter("FailedToCreate", true);
+ Total = subgroup->GetCounter("TotalCheckpoints", true);
+ LastCheckpointBarrierDeliveryTimeMillis = subgroup->GetCounter("LastCheckpointBarrierDeliveryTimeMillis");
+ LastCheckpointDurationMillis = subgroup->GetCounter("LastSuccessfulCheckpointDurationMillis");
+ LastCheckpointSizeBytes = subgroup->GetCounter("LastSuccessfulCheckpointSizeBytes");
+ CheckpointBarrierDeliveryTimeMillis = subgroup->GetHistogram("CheckpointBarrierDeliveryTimeMillis", NMonitoring::ExponentialHistogram(12, 2, 1024)); // ~ 1s -> ~ 1 h
+ CheckpointDurationMillis = subgroup->GetHistogram("CheckpointDurationMillis", NMonitoring::ExponentialHistogram(12, 2, 1024)); // ~ 1s -> ~ 1 h
+ CheckpointSizeBytes = subgroup->GetHistogram("CheckpointSizeBytes", NMonitoring::ExponentialHistogram(8, 32, 32)); // 32b -> 1Tb
+ SkippedDueToInFlightLimit = subgroup->GetCounter("SkippedDueToInFlightLimit");
RestoredFromSavedCheckpoint = subgroup->GetCounter("RestoredFromSavedCheckpoint", true);
StartedFromEmptyCheckpoint = subgroup->GetCounter("StartedFromEmptyCheckpoint", true);
RestoredStreamingOffsetsFromCheckpoint = subgroup->GetCounter("RestoredStreamingOffsetsFromCheckpoint", true);
- }
-
- NMonitoring::TDynamicCounters::TCounterPtr InProgress;
- NMonitoring::TDynamicCounters::TCounterPtr Pending;
- NMonitoring::TDynamicCounters::TCounterPtr PendingCommit;
- NMonitoring::TDynamicCounters::TCounterPtr Completed;
- NMonitoring::TDynamicCounters::TCounterPtr Aborted;
- NMonitoring::TDynamicCounters::TCounterPtr StorageError;
- NMonitoring::TDynamicCounters::TCounterPtr FailedToCreate;
- NMonitoring::TDynamicCounters::TCounterPtr Total;
- NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointBarrierDeliveryTimeMillis;
- NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointDurationMillis;
- NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointSizeBytes;
- NMonitoring::TDynamicCounters::TCounterPtr SkippedDueToInFlightLimit;
+ }
+
+ NMonitoring::TDynamicCounters::TCounterPtr InProgress;
+ NMonitoring::TDynamicCounters::TCounterPtr Pending;
+ NMonitoring::TDynamicCounters::TCounterPtr PendingCommit;
+ NMonitoring::TDynamicCounters::TCounterPtr Completed;
+ NMonitoring::TDynamicCounters::TCounterPtr Aborted;
+ NMonitoring::TDynamicCounters::TCounterPtr StorageError;
+ NMonitoring::TDynamicCounters::TCounterPtr FailedToCreate;
+ NMonitoring::TDynamicCounters::TCounterPtr Total;
+ NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointBarrierDeliveryTimeMillis;
+ NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointDurationMillis;
+ NMonitoring::TDynamicCounters::TCounterPtr LastCheckpointSizeBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr SkippedDueToInFlightLimit;
NMonitoring::TDynamicCounters::TCounterPtr RestoredFromSavedCheckpoint;
NMonitoring::TDynamicCounters::TCounterPtr StartedFromEmptyCheckpoint;
NMonitoring::TDynamicCounters::TCounterPtr RestoredStreamingOffsetsFromCheckpoint;
- NMonitoring::THistogramPtr CheckpointBarrierDeliveryTimeMillis;
- NMonitoring::THistogramPtr CheckpointDurationMillis;
- NMonitoring::THistogramPtr CheckpointSizeBytes;
- };
-
+ NMonitoring::THistogramPtr CheckpointBarrierDeliveryTimeMillis;
+ NMonitoring::THistogramPtr CheckpointDurationMillis;
+ NMonitoring::THistogramPtr CheckpointSizeBytes;
+ };
+
struct TComputeActorTransportStuff : public TSimpleRefCount<TComputeActorTransportStuff> {
using TPtr = TIntrusivePtr<TComputeActorTransportStuff>;
NYql::NDq::TRetryEventsQueue EventsQueue;
};
- const TCoordinatorId CoordinatorId;
- const TActorId TaskControllerId;
- const TActorId StorageProxy;
+ const TCoordinatorId CoordinatorId;
+ const TActorId TaskControllerId;
+ const TActorId StorageProxy;
const TActorId RunActorId;
- std::unique_ptr<TCheckpointIdGenerator> CheckpointIdGenerator;
+ std::unique_ptr<TCheckpointIdGenerator> CheckpointIdGenerator;
TCheckpointCoordinatorConfig Settings;
- const TDuration CheckpointingPeriod;
+ const TDuration CheckpointingPeriod;
const NProto::TGraphParams GraphParams;
TString GraphDescId;
-
+
THashMap<TActorId, TComputeActorTransportStuff::TPtr> AllActors;
THashSet<TActorId> AllActorsSet;
THashMap<TActorId, TComputeActorTransportStuff::TPtr> ActorsToTrigger;
@@ -161,20 +161,20 @@ private:
THashMap<TActorId, TComputeActorTransportStuff::TPtr> ActorsToNotify;
THashSet<TActorId> ActorsToNotifySet;
THashMap<ui64, TActorId> TaskIdToActor; // Task id -> actor.
- THashMap<TCheckpointId, TPendingCheckpoint, TCheckpointIdHash> PendingCheckpoints;
- THashMap<TCheckpointId, TPendingCheckpoint, TCheckpointIdHash> PendingCommitCheckpoints;
- TMaybe<TPendingRestoreCheckpoint> PendingRestoreCheckpoint;
+ THashMap<TCheckpointId, TPendingCheckpoint, TCheckpointIdHash> PendingCheckpoints;
+ THashMap<TCheckpointId, TPendingCheckpoint, TCheckpointIdHash> PendingCommitCheckpoints;
+ TMaybe<TPendingRestoreCheckpoint> PendingRestoreCheckpoint;
std::unique_ptr<TPendingInitCoordinator> PendingInit;
- bool GraphIsRunning = false;
+ bool GraphIsRunning = false;
bool InitingZeroCheckpoint = false;
bool RestoringFromForeignCheckpoint = false;
-
- TCheckpointCoordinatorMetrics Metrics;
-
- YandexQuery::StateLoadMode StateLoadMode;
+
+ TCheckpointCoordinatorMetrics Metrics;
+
+ YandexQuery::StateLoadMode StateLoadMode;
YandexQuery::StreamingDisposition StreamingDisposition;
-};
-
+};
+
THolder<NActors::IActor> MakeCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& executerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode = YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT, const YandexQuery::StreamingDisposition& streamingDisposition = {});
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.cpp
index 6b415e8ac8..301ea0ac98 100644
--- a/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.cpp
+++ b/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.cpp
@@ -1,23 +1,23 @@
-#include "checkpoint_id_generator.h"
+#include "checkpoint_id_generator.h"
#include <ydb/core/yq/libs/checkpointing_common/defs.h>
namespace NYq {
-
-TCheckpointIdGenerator::TCheckpointIdGenerator(TCoordinatorId coordinatorId, TCheckpointId lastCheckpoint)
- : CoordinatorId(std::move(coordinatorId)) {
- if (CoordinatorId.Generation > lastCheckpoint.CoordinatorGeneration) {
- NextNumber = 1;
- } else if (CoordinatorId.Generation == lastCheckpoint.CoordinatorGeneration) {
- NextNumber = lastCheckpoint.SeqNo + 1;
- } else {
- ythrow yexception() << "Unexpected CheckpointCoordinator generation: " << CoordinatorId.Generation
- << " while last checkpoint has " << lastCheckpoint.CoordinatorGeneration;
- }
-}
-
+
+TCheckpointIdGenerator::TCheckpointIdGenerator(TCoordinatorId coordinatorId, TCheckpointId lastCheckpoint)
+ : CoordinatorId(std::move(coordinatorId)) {
+ if (CoordinatorId.Generation > lastCheckpoint.CoordinatorGeneration) {
+ NextNumber = 1;
+ } else if (CoordinatorId.Generation == lastCheckpoint.CoordinatorGeneration) {
+ NextNumber = lastCheckpoint.SeqNo + 1;
+ } else {
+ ythrow yexception() << "Unexpected CheckpointCoordinator generation: " << CoordinatorId.Generation
+ << " while last checkpoint has " << lastCheckpoint.CoordinatorGeneration;
+ }
+}
+
TCheckpointId NYq::TCheckpointIdGenerator::NextId() {
return TCheckpointId(CoordinatorId.Generation, NextNumber++);
-}
-
+}
+
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.h b/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.h
index bcc295cf33..d902553c79 100644
--- a/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.h
+++ b/ydb/core/yq/libs/checkpointing/checkpoint_id_generator.h
@@ -1,18 +1,18 @@
-#pragma once
-
+#pragma once
+
#include <ydb/core/yq/libs/checkpointing_common/defs.h>
-
+
namespace NYq {
-
-class TCheckpointIdGenerator {
-private:
+
+class TCheckpointIdGenerator {
+private:
TCoordinatorId CoordinatorId;
- ui64 NextNumber;
-
-public:
- explicit TCheckpointIdGenerator(TCoordinatorId id, TCheckpointId lastCheckpoint = TCheckpointId(0, 0));
-
+ ui64 NextNumber;
+
+public:
+ explicit TCheckpointIdGenerator(TCoordinatorId id, TCheckpointId lastCheckpoint = TCheckpointId(0, 0));
+
TCheckpointId NextId();
-};
-
+};
+
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpointing/pending_checkpoint.cpp b/ydb/core/yq/libs/checkpointing/pending_checkpoint.cpp
index 01c0204e46..888580c210 100644
--- a/ydb/core/yq/libs/checkpointing/pending_checkpoint.cpp
+++ b/ydb/core/yq/libs/checkpointing/pending_checkpoint.cpp
@@ -1,51 +1,51 @@
-#include "pending_checkpoint.h"
-
+#include "pending_checkpoint.h"
+
namespace NYq {
-
-TPendingCheckpoint::TPendingCheckpoint(THashSet<NActors::TActorId> toBeAcknowledged, TPendingCheckpointStats stats)
- : NotYetAcknowledged(std::move(toBeAcknowledged))
- , Stats(std::move(stats)) {
-}
-
-void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId) {
- Acknowledge(actorId, 0);
-}
-
-void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId, ui64 stateSize) {
- NotYetAcknowledged.erase(actorId);
- Stats.StateSize += stateSize;
-}
-
-bool TPendingCheckpoint::GotAllAcknowledges() const {
- return NotYetAcknowledged.empty();
-}
-
-size_t TPendingCheckpoint::NotYetAcknowledgedCount() const {
- return NotYetAcknowledged.size();
-}
-
-const TPendingCheckpointStats& TPendingCheckpoint::GetStats() const {
- return Stats;
-}
-
-TPendingRestoreCheckpoint::TPendingRestoreCheckpoint(TCheckpointId checkpointId, bool commitAfterRestore, THashSet<NActors::TActorId> toBeAcknowledged)
- : CheckpointId(checkpointId)
- , CommitAfterRestore(commitAfterRestore)
- , NotYetAcknowledged(std::move(toBeAcknowledged)) {
-}
-
-void TPendingRestoreCheckpoint::Acknowledge(const NActors::TActorId& actorId) {
- NotYetAcknowledged.erase(actorId);
-}
-
-bool TPendingRestoreCheckpoint::GotAllAcknowledges() const {
- return NotYetAcknowledged.empty();
-}
-
-size_t TPendingRestoreCheckpoint::NotYetAcknowledgedCount() const {
- return NotYetAcknowledged.size();
-}
-
+
+TPendingCheckpoint::TPendingCheckpoint(THashSet<NActors::TActorId> toBeAcknowledged, TPendingCheckpointStats stats)
+ : NotYetAcknowledged(std::move(toBeAcknowledged))
+ , Stats(std::move(stats)) {
+}
+
+void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId) {
+ Acknowledge(actorId, 0);
+}
+
+void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId, ui64 stateSize) {
+ NotYetAcknowledged.erase(actorId);
+ Stats.StateSize += stateSize;
+}
+
+bool TPendingCheckpoint::GotAllAcknowledges() const {
+ return NotYetAcknowledged.empty();
+}
+
+size_t TPendingCheckpoint::NotYetAcknowledgedCount() const {
+ return NotYetAcknowledged.size();
+}
+
+const TPendingCheckpointStats& TPendingCheckpoint::GetStats() const {
+ return Stats;
+}
+
+TPendingRestoreCheckpoint::TPendingRestoreCheckpoint(TCheckpointId checkpointId, bool commitAfterRestore, THashSet<NActors::TActorId> toBeAcknowledged)
+ : CheckpointId(checkpointId)
+ , CommitAfterRestore(commitAfterRestore)
+ , NotYetAcknowledged(std::move(toBeAcknowledged)) {
+}
+
+void TPendingRestoreCheckpoint::Acknowledge(const NActors::TActorId& actorId) {
+ NotYetAcknowledged.erase(actorId);
+}
+
+bool TPendingRestoreCheckpoint::GotAllAcknowledges() const {
+ return NotYetAcknowledged.empty();
+}
+
+size_t TPendingRestoreCheckpoint::NotYetAcknowledgedCount() const {
+ return NotYetAcknowledged.size();
+}
+
void TPendingInitCoordinator::OnNewCheckpointCoordinatorAck() {
++NewCheckpointCoordinatorAcksGot;
Y_VERIFY(NewCheckpointCoordinatorAcksGot <= ActorsCount);
diff --git a/ydb/core/yq/libs/checkpointing/pending_checkpoint.h b/ydb/core/yq/libs/checkpointing/pending_checkpoint.h
index bc42b48018..3ce3ec1431 100644
--- a/ydb/core/yq/libs/checkpointing/pending_checkpoint.h
+++ b/ydb/core/yq/libs/checkpointing/pending_checkpoint.h
@@ -1,56 +1,56 @@
#pragma once
#include <ydb/core/yq/libs/checkpointing_common/defs.h>
-
-#include <library/cpp/actors/core/actor.h>
-
+
+#include <library/cpp/actors/core/actor.h>
+
namespace NYq {
-
-struct TPendingCheckpointStats {
- const TInstant CreatedAt = TInstant::Now();
- ui64 StateSize = 0;
-};
-
-class TPendingCheckpoint {
- THashSet<NActors::TActorId> NotYetAcknowledged;
- TPendingCheckpointStats Stats;
-
-public:
- explicit TPendingCheckpoint(THashSet<NActors::TActorId> toBeAcknowledged, TPendingCheckpointStats stats = TPendingCheckpointStats());
-
- void Acknowledge(const NActors::TActorId& actorId);
-
- void Acknowledge(const NActors::TActorId& actorId, ui64 stateSize);
-
- [[nodiscard]]
- bool GotAllAcknowledges() const;
-
- [[nodiscard]]
- size_t NotYetAcknowledgedCount() const;
-
- [[nodiscard]]
- const TPendingCheckpointStats& GetStats() const;
-};
-
-class TPendingRestoreCheckpoint {
-public:
- TPendingRestoreCheckpoint(TCheckpointId checkpointId, bool commitAfterRestore, THashSet<NActors::TActorId> toBeAcknowledged);
-
- void Acknowledge(const NActors::TActorId& actorId);
-
- [[nodiscard]]
- bool GotAllAcknowledges() const;
-
- [[nodiscard]]
- size_t NotYetAcknowledgedCount() const;
-
-public:
- TCheckpointId CheckpointId;
- bool CommitAfterRestore;
-
-private:
- THashSet<NActors::TActorId> NotYetAcknowledged;
-};
-
+
+struct TPendingCheckpointStats {
+ const TInstant CreatedAt = TInstant::Now();
+ ui64 StateSize = 0;
+};
+
+class TPendingCheckpoint {
+ THashSet<NActors::TActorId> NotYetAcknowledged;
+ TPendingCheckpointStats Stats;
+
+public:
+ explicit TPendingCheckpoint(THashSet<NActors::TActorId> toBeAcknowledged, TPendingCheckpointStats stats = TPendingCheckpointStats());
+
+ void Acknowledge(const NActors::TActorId& actorId);
+
+ void Acknowledge(const NActors::TActorId& actorId, ui64 stateSize);
+
+ [[nodiscard]]
+ bool GotAllAcknowledges() const;
+
+ [[nodiscard]]
+ size_t NotYetAcknowledgedCount() const;
+
+ [[nodiscard]]
+ const TPendingCheckpointStats& GetStats() const;
+};
+
+class TPendingRestoreCheckpoint {
+public:
+ TPendingRestoreCheckpoint(TCheckpointId checkpointId, bool commitAfterRestore, THashSet<NActors::TActorId> toBeAcknowledged);
+
+ void Acknowledge(const NActors::TActorId& actorId);
+
+ [[nodiscard]]
+ bool GotAllAcknowledges() const;
+
+ [[nodiscard]]
+ size_t NotYetAcknowledgedCount() const;
+
+public:
+ TCheckpointId CheckpointId;
+ bool CommitAfterRestore;
+
+private:
+ THashSet<NActors::TActorId> NotYetAcknowledged;
+};
+
class TPendingInitCoordinator {
public:
explicit TPendingInitCoordinator(size_t actorsCount)
diff --git a/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
index ceb0a9faf5..77deb2ff61 100644
--- a/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
+++ b/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
@@ -2,226 +2,226 @@
#include <ydb/core/yq/libs/graph_params/proto/graph_params.pb.h>
#include <ydb/core/testlib/actors/test_runtime.h>
#include <ydb/core/testlib/basics/helpers.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-#include <library/cpp/actors/core/executor_pool_basic.h>
-#include <library/cpp/actors/core/scheduler_basic.h>
-
-namespace {
-
-using namespace NKikimr;
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+
+namespace {
+
+using namespace NKikimr;
using namespace NYq;
-
+
enum ETestGraphFlags : ui64 {
InputWithSource = 1,
SourceWithChannelInOneTask = 2,
};
NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) {
-
+
NYql::NDqProto::TReadyState result;
- auto* ingress = result.AddTask();
- ingress->SetId(1);
- auto* ingressOutput = ingress->AddOutputs();
- ingressOutput->AddChannels();
+ auto* ingress = result.AddTask();
+ ingress->SetId(1);
+ auto* ingressOutput = ingress->AddOutputs();
+ ingressOutput->AddChannels();
if (flags & ETestGraphFlags::InputWithSource) {
- auto* source = ingress->AddInputs()->MutableSource();
- source->SetType("PqSource");
+ auto* source = ingress->AddInputs()->MutableSource();
+ source->SetType("PqSource");
}
-
- auto* map = result.AddTask();
- map->SetId(2);
- auto* mapInput = map->AddInputs();
- mapInput->AddChannels();
- auto* mapOutput = map->AddOutputs();
- mapOutput->AddChannels();
+
+ auto* map = result.AddTask();
+ map->SetId(2);
+ auto* mapInput = map->AddInputs();
+ mapInput->AddChannels();
+ auto* mapOutput = map->AddOutputs();
+ mapOutput->AddChannels();
if (flags & ETestGraphFlags::SourceWithChannelInOneTask) {
- auto* source = map->AddInputs()->MutableSource();
- source->SetType("PqSource");
+ auto* source = map->AddInputs()->MutableSource();
+ source->SetType("PqSource");
}
-
- auto* egress = result.AddTask();
- egress->SetId(3);
- auto* egressInput = egress->AddInputs();
- egressInput->AddChannels();
-
- return result;
-}
-
-struct TTestBootstrap : public TTestActorRuntime {
+
+ auto* egress = result.AddTask();
+ egress->SetId(3);
+ auto* egressInput = egress->AddInputs();
+ egressInput->AddChannels();
+
+ return result;
+}
+
+struct TTestBootstrap : public TTestActorRuntime {
NYql::NDqProto::TReadyState GraphState;
- NConfig::TCheckpointCoordinatorConfig Settings;
- NActors::TActorId StorageProxy;
- NActors::TActorId CheckpointCoordinator;
+ NConfig::TCheckpointCoordinatorConfig Settings;
+ NActors::TActorId StorageProxy;
+ NActors::TActorId CheckpointCoordinator;
NActors::TActorId RunActor;
-
- NActors::TActorId IngressActor;
- NActors::TActorId MapActor;
- NActors::TActorId EgressActor;
-
- THashMap<TActorId, ui64> ActorToTask;
-
- NMonitoring::TDynamicCounterPtr Counters = new NMonitoring::TDynamicCounters();
-
+
+ NActors::TActorId IngressActor;
+ NActors::TActorId MapActor;
+ NActors::TActorId EgressActor;
+
+ THashMap<TActorId, ui64> ActorToTask;
+
+ NMonitoring::TDynamicCounterPtr Counters = new NMonitoring::TDynamicCounters();
+
explicit TTestBootstrap(ui64 graphFlags = 0)
: TTestActorRuntime(true)
, GraphState(BuildTestGraph(graphFlags))
{
- TAutoPtr<TAppPrepare> app = new TAppPrepare();
- Initialize(app->Unwrap());
- StorageProxy = AllocateEdgeActor();
+ TAutoPtr<TAppPrepare> app = new TAppPrepare();
+ Initialize(app->Unwrap());
+ StorageProxy = AllocateEdgeActor();
RunActor = AllocateEdgeActor();
- IngressActor = AllocateEdgeActor();
- MapActor = AllocateEdgeActor();
- EgressActor = AllocateEdgeActor();
-
+ IngressActor = AllocateEdgeActor();
+ MapActor = AllocateEdgeActor();
+ EgressActor = AllocateEdgeActor();
+
ActorIdToProto(IngressActor, GraphState.AddActorId());
ActorIdToProto(MapActor, GraphState.AddActorId());
ActorIdToProto(EgressActor, GraphState.AddActorId());
-
+
ActorToTask[IngressActor] = GraphState.GetTask()[0].GetId();
ActorToTask[MapActor] = GraphState.GetTask()[1].GetId();
ActorToTask[EgressActor] = GraphState.GetTask()[2].GetId();
-
- Settings = NConfig::TCheckpointCoordinatorConfig();
+
+ Settings = NConfig::TCheckpointCoordinatorConfig();
Settings.SetEnabled(true);
- Settings.SetCheckpointingPeriodMillis(TDuration::Hours(1).MilliSeconds());
- Settings.SetMaxInflight(1);
-
- SetLogPriority(NKikimrServices::STREAMS_CHECKPOINT_COORDINATOR, NLog::PRI_DEBUG);
-
+ Settings.SetCheckpointingPeriodMillis(TDuration::Hours(1).MilliSeconds());
+ Settings.SetMaxInflight(1);
+
+ SetLogPriority(NKikimrServices::STREAMS_CHECKPOINT_COORDINATOR, NLog::PRI_DEBUG);
+
CheckpointCoordinator = Register(MakeCheckpointCoordinator(TCoordinatorId("my-graph-id", 42), {}, StorageProxy, RunActor, Settings, Counters, NProto::TGraphParams()).Release());
WaitForBootstrap();
Send(new IEventHandle(CheckpointCoordinator, {}, new NYql::NDqs::TEvReadyState(std::move(GraphState))));
- EnableScheduleForActor(CheckpointCoordinator);
- }
+ EnableScheduleForActor(CheckpointCoordinator);
+ }
void WaitForBootstrap() {
NActors::TDispatchOptions options;
options.FinalEvents.emplace_back(NActors::TEvents::TSystem::Bootstrap, 1);
DispatchEvents(options);
}
-};
-} // namespace
-
+};
+} // namespace
+
namespace NYq {
-
-void MockRegisterCoordinatorResponseEvent(TTestBootstrap& bootstrap, NYql::TIssues issues = NYql::TIssues()) {
- bootstrap.Send(new IEventHandle(
- bootstrap.CheckpointCoordinator,
- bootstrap.StorageProxy,
+
+void MockRegisterCoordinatorResponseEvent(TTestBootstrap& bootstrap, NYql::TIssues issues = NYql::TIssues()) {
+ bootstrap.Send(new IEventHandle(
+ bootstrap.CheckpointCoordinator,
+ bootstrap.StorageProxy,
new TEvCheckpointStorage::TEvRegisterCoordinatorResponse(std::move(issues))));
-}
-
-void MockCheckpointsMetadataResponse(TTestBootstrap& bootstrap, NYql::TIssues issues = NYql::TIssues()) {
- bootstrap.Send(new IEventHandle(
- bootstrap.CheckpointCoordinator,
- bootstrap.StorageProxy,
- new TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse(TVector<TCheckpointMetadata>(), std::move(issues))));
-}
-
+}
+
+void MockCheckpointsMetadataResponse(TTestBootstrap& bootstrap, NYql::TIssues issues = NYql::TIssues()) {
+ bootstrap.Send(new IEventHandle(
+ bootstrap.CheckpointCoordinator,
+ bootstrap.StorageProxy,
+ new TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse(TVector<TCheckpointMetadata>(), std::move(issues))));
+}
+
void MockCreateCheckpointResponse(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, NYql::TIssues issues = NYql::TIssues()) {
- bootstrap.Send(new IEventHandle(
- bootstrap.CheckpointCoordinator,
- bootstrap.StorageProxy,
+ bootstrap.Send(new IEventHandle(
+ bootstrap.CheckpointCoordinator,
+ bootstrap.StorageProxy,
new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), "42")));
-}
-
+}
+
void MockNodeStateSavedEvent(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, TActorId& sender) {
- auto ev = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
- ev->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
- ev->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo);
- ev->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK);
- bootstrap.Send(new IEventHandle(
- bootstrap.CheckpointCoordinator,
- sender,
- ev.release()));
-}
-
+ auto ev = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
+ ev->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
+ ev->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo);
+ ev->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK);
+ bootstrap.Send(new IEventHandle(
+ bootstrap.CheckpointCoordinator,
+ sender,
+ ev.release()));
+}
+
void MockNodeStateSaveFailedEvent(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, TActorId& sender) {
- auto ev = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
- ev->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
- ev->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo);
- ev->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR);
- bootstrap.Send(new IEventHandle(
- bootstrap.CheckpointCoordinator,
- sender,
- ev.release()));
-}
-
+ auto ev = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
+ ev->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
+ ev->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo);
+ ev->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR);
+ bootstrap.Send(new IEventHandle(
+ bootstrap.CheckpointCoordinator,
+ sender,
+ ev.release()));
+}
+
void MockSetCheckpointPendingCommitStatusResponse(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, NYql::TIssues issues = NYql::TIssues()) {
- bootstrap.Send(new IEventHandle(
- bootstrap.CheckpointCoordinator,
- bootstrap.StorageProxy,
+ bootstrap.Send(new IEventHandle(
+ bootstrap.CheckpointCoordinator,
+ bootstrap.StorageProxy,
new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse(checkpointId, std::move(issues))));
-}
-
+}
+
void MockChangesCommittedEvent(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, TActorId& sender) {
- bootstrap.Send(new IEventHandle(
- bootstrap.CheckpointCoordinator,
- sender,
- new NYql::NDq::TEvDqCompute::TEvStateCommitted(checkpointId.SeqNo, checkpointId.CoordinatorGeneration, bootstrap.ActorToTask[sender])));
-}
-
+ bootstrap.Send(new IEventHandle(
+ bootstrap.CheckpointCoordinator,
+ sender,
+ new NYql::NDq::TEvDqCompute::TEvStateCommitted(checkpointId.SeqNo, checkpointId.CoordinatorGeneration, bootstrap.ActorToTask[sender])));
+}
+
void MockCompleteCheckpointResponse(TTestBootstrap& bootstrap, TCheckpointId& checkpointId, NYql::TIssues issues = NYql::TIssues()) {
- bootstrap.Send(new IEventHandle(
- bootstrap.CheckpointCoordinator,
- bootstrap.StorageProxy,
+ bootstrap.Send(new IEventHandle(
+ bootstrap.CheckpointCoordinator,
+ bootstrap.StorageProxy,
new TEvCheckpointStorage::TEvCompleteCheckpointResponse(checkpointId, std::move(issues))));
-}
-
-Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
+}
+
+Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
void ShouldTriggerCheckpointImpl(ui64 graphFlags) {
TTestBootstrap bootstrap(graphFlags);
-
- Cerr << "Waiting for TEvRegisterCoordinatorRequest (storage)" << Endl;
+
+ Cerr << "Waiting for TEvRegisterCoordinatorRequest (storage)" << Endl;
bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvRegisterCoordinatorRequest>(
- bootstrap.StorageProxy, TDuration::Seconds(10));
- MockRegisterCoordinatorResponseEvent(bootstrap);
-
- Cerr << "Waiting for TEvGetCheckpointsMetadataRequest (storage)" << Endl;
- bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest>(
- bootstrap.StorageProxy, TDuration::Seconds(10));
- MockCheckpointsMetadataResponse(bootstrap);
-
- Cerr << "Waiting for TEvCreateCheckpointRequest (storage)" << Endl;
+ bootstrap.StorageProxy, TDuration::Seconds(10));
+ MockRegisterCoordinatorResponseEvent(bootstrap);
+
+ Cerr << "Waiting for TEvGetCheckpointsMetadataRequest (storage)" << Endl;
+ bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest>(
+ bootstrap.StorageProxy, TDuration::Seconds(10));
+ MockCheckpointsMetadataResponse(bootstrap);
+
+ Cerr << "Waiting for TEvCreateCheckpointRequest (storage)" << Endl;
auto updateState = bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvCreateCheckpointRequest>(
- bootstrap.StorageProxy, TDuration::Seconds(10));
-
- auto& checkpointId = updateState->Get()->CheckpointId;
- MockCreateCheckpointResponse(bootstrap, checkpointId);
-
- Cerr << "Waiting for TEvInjectCheckpointBarrier (ingress)" << Endl;
- bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvInjectCheckpoint>(
- bootstrap.IngressActor, TDuration::Seconds(10));
-
- MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.IngressActor);
- MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.MapActor);
- MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.EgressActor);
-
- Cerr << "Waiting for TEvSetCheckpointPendingCommitStatusRequest (storage)" << Endl;
+ bootstrap.StorageProxy, TDuration::Seconds(10));
+
+ auto& checkpointId = updateState->Get()->CheckpointId;
+ MockCreateCheckpointResponse(bootstrap, checkpointId);
+
+ Cerr << "Waiting for TEvInjectCheckpointBarrier (ingress)" << Endl;
+ bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvInjectCheckpoint>(
+ bootstrap.IngressActor, TDuration::Seconds(10));
+
+ MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.IngressActor);
+ MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.MapActor);
+ MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.EgressActor);
+
+ Cerr << "Waiting for TEvSetCheckpointPendingCommitStatusRequest (storage)" << Endl;
bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest>(
- bootstrap.StorageProxy, TDuration::Seconds(10));
-
- MockSetCheckpointPendingCommitStatusResponse(bootstrap, checkpointId);
- Cerr << "Waiting for TEvCommitChanges (ingress)" << Endl;
- bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvCommitState>(bootstrap.IngressActor, TDuration::Seconds(10));
- Cerr << "Waiting for TEvCommitChanges (egress)" << Endl;
- bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvCommitState>(bootstrap.EgressActor, TDuration::Seconds(10));
-
- MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.IngressActor);
- MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.MapActor);
- MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.EgressActor);
-
- Cerr << "Waiting for TEvCompleteCheckpointRequest (storage)" << Endl;
+ bootstrap.StorageProxy, TDuration::Seconds(10));
+
+ MockSetCheckpointPendingCommitStatusResponse(bootstrap, checkpointId);
+ Cerr << "Waiting for TEvCommitChanges (ingress)" << Endl;
+ bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvCommitState>(bootstrap.IngressActor, TDuration::Seconds(10));
+ Cerr << "Waiting for TEvCommitChanges (egress)" << Endl;
+ bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvCommitState>(bootstrap.EgressActor, TDuration::Seconds(10));
+
+ MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.IngressActor);
+ MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.MapActor);
+ MockChangesCommittedEvent(bootstrap, checkpointId, bootstrap.EgressActor);
+
+ Cerr << "Waiting for TEvCompleteCheckpointRequest (storage)" << Endl;
auto completed = bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvCompleteCheckpointRequest>(
- bootstrap.StorageProxy, TDuration::Seconds(10));
- UNIT_ASSERT(completed.Get() != nullptr);
- MockCompleteCheckpointResponse(bootstrap, checkpointId);
- }
-
+ bootstrap.StorageProxy, TDuration::Seconds(10));
+ UNIT_ASSERT(completed.Get() != nullptr);
+ MockCompleteCheckpointResponse(bootstrap, checkpointId);
+ }
+
Y_UNIT_TEST(ShouldTriggerCheckpoint) {
ShouldTriggerCheckpointImpl(0);
}
@@ -238,39 +238,39 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
ShouldTriggerCheckpointImpl(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask);
}
- Y_UNIT_TEST(ShouldAbortPreviousCheckpointsIfNodeStateCantBeSaved) {
- TTestBootstrap bootstrap{ETestGraphFlags::InputWithSource};
-
- Cerr << "Waiting for TEvRegisterCoordinatorRequest (storage)" << Endl;
+ Y_UNIT_TEST(ShouldAbortPreviousCheckpointsIfNodeStateCantBeSaved) {
+ TTestBootstrap bootstrap{ETestGraphFlags::InputWithSource};
+
+ Cerr << "Waiting for TEvRegisterCoordinatorRequest (storage)" << Endl;
bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvRegisterCoordinatorRequest>(
- bootstrap.StorageProxy, TDuration::Seconds(10));
- MockRegisterCoordinatorResponseEvent(bootstrap);
-
- Cerr << "Waiting for TEvGetCheckpointsMetadataRequest (storage)" << Endl;
- bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest>(
- bootstrap.StorageProxy, TDuration::Seconds(10));
- MockCheckpointsMetadataResponse(bootstrap);
-
- Cerr << "Waiting for TEvCreateCheckpointRequest (storage)" << Endl;
+ bootstrap.StorageProxy, TDuration::Seconds(10));
+ MockRegisterCoordinatorResponseEvent(bootstrap);
+
+ Cerr << "Waiting for TEvGetCheckpointsMetadataRequest (storage)" << Endl;
+ bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest>(
+ bootstrap.StorageProxy, TDuration::Seconds(10));
+ MockCheckpointsMetadataResponse(bootstrap);
+
+ Cerr << "Waiting for TEvCreateCheckpointRequest (storage)" << Endl;
auto updateState = bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvCreateCheckpointRequest>(
- bootstrap.StorageProxy, TDuration::Seconds(10));
- UNIT_ASSERT(updateState->Get()->NodeCount == 3);
-
- auto& checkpointId = updateState->Get()->CheckpointId;
- MockCreateCheckpointResponse(bootstrap, checkpointId);
-
- Cerr << "Waiting for TEvInjectCheckpointBarrier (ingress)" << Endl;
- bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvInjectCheckpoint>(bootstrap.IngressActor, TDuration::Seconds(10));
-
- MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.IngressActor);
- MockNodeStateSaveFailedEvent(bootstrap, checkpointId, bootstrap.MapActor);
- MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.EgressActor);
-
- Cerr << "Waiting for TEvCompleteCheckpointRequest (storage)" << Endl;
+ bootstrap.StorageProxy, TDuration::Seconds(10));
+ UNIT_ASSERT(updateState->Get()->NodeCount == 3);
+
+ auto& checkpointId = updateState->Get()->CheckpointId;
+ MockCreateCheckpointResponse(bootstrap, checkpointId);
+
+ Cerr << "Waiting for TEvInjectCheckpointBarrier (ingress)" << Endl;
+ bootstrap.GrabEdgeEvent<NYql::NDq::TEvDqCompute::TEvInjectCheckpoint>(bootstrap.IngressActor, TDuration::Seconds(10));
+
+ MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.IngressActor);
+ MockNodeStateSaveFailedEvent(bootstrap, checkpointId, bootstrap.MapActor);
+ MockNodeStateSavedEvent(bootstrap, checkpointId, bootstrap.EgressActor);
+
+ Cerr << "Waiting for TEvCompleteCheckpointRequest (storage)" << Endl;
auto completed = bootstrap.GrabEdgeEvent<TEvCheckpointStorage::TEvAbortCheckpointRequest>(
- bootstrap.StorageProxy, TDuration::Seconds(10));
- UNIT_ASSERT(completed.Get() != nullptr);
- }
-}
-
+ bootstrap.StorageProxy, TDuration::Seconds(10));
+ UNIT_ASSERT(completed.Get() != nullptr);
+ }
+}
+
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpointing/ut/ya.make b/ydb/core/yq/libs/checkpointing/ut/ya.make
index cf9700d584..f9a3b54437 100644
--- a/ydb/core/yq/libs/checkpointing/ut/ya.make
+++ b/ydb/core/yq/libs/checkpointing/ut/ya.make
@@ -1,20 +1,20 @@
UNITTEST_FOR(ydb/core/yq/libs/checkpointing)
-
+
OWNER(g:yq)
-
-SRCS(
- checkpoint_coordinator_ut.cpp
-)
-
-PEERDIR(
- library/cpp/testing/unittest
+
+SRCS(
+ checkpoint_coordinator_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/testing/unittest
ydb/core/testlib/actors
ydb/core/testlib/basics
ydb/core/yq/libs/checkpointing
-)
-
-SIZE(MEDIUM)
-
-YQL_LAST_ABI_VERSION()
-
+)
+
+SIZE(MEDIUM)
+
+YQL_LAST_ABI_VERSION()
+
END()
diff --git a/ydb/core/yq/libs/checkpointing/utils.cpp b/ydb/core/yq/libs/checkpointing/utils.cpp
index 22c6323214..63d427fa7c 100644
--- a/ydb/core/yq/libs/checkpointing/utils.cpp
+++ b/ydb/core/yq/libs/checkpointing/utils.cpp
@@ -1,5 +1,5 @@
-#include "utils.h"
-
+#include "utils.h"
+
namespace NYq {
bool IsIngress(const NYql::NDqProto::TDqTask& task) {
@@ -10,8 +10,8 @@ bool IsIngress(const NYql::NDqProto::TDqTask& task) {
}
}
return true;
-}
-
+}
+
bool IsEgress(const NYql::NDqProto::TDqTask& task) {
for (const auto& output : task.GetOutputs()) {
if (output.HasSink()) {
@@ -19,11 +19,11 @@ bool IsEgress(const NYql::NDqProto::TDqTask& task) {
}
}
return false;
-}
-
+}
+
bool HasState(const NYql::NDqProto::TDqTask& task) {
- Y_UNUSED(task);
- return true;
-}
+ Y_UNUSED(task);
+ return true;
+}
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpointing/utils.h b/ydb/core/yq/libs/checkpointing/utils.h
index d0540804c7..b664554d50 100644
--- a/ydb/core/yq/libs/checkpointing/utils.h
+++ b/ydb/core/yq/libs/checkpointing/utils.h
@@ -1,13 +1,13 @@
-#pragma once
-
+#pragma once
+
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
-
+
namespace NYq {
-
-bool IsIngress(const NYql::NDqProto::TDqTask& task);
-
-bool IsEgress(const NYql::NDqProto::TDqTask& task);
-
-bool HasState(const NYql::NDqProto::TDqTask& task);
-
+
+bool IsIngress(const NYql::NDqProto::TDqTask& task);
+
+bool IsEgress(const NYql::NDqProto::TDqTask& task);
+
+bool HasState(const NYql::NDqProto::TDqTask& task);
+
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpointing/ya.make b/ydb/core/yq/libs/checkpointing/ya.make
index 8a397f67ef..c16cc139ca 100644
--- a/ydb/core/yq/libs/checkpointing/ya.make
+++ b/ydb/core/yq/libs/checkpointing/ya.make
@@ -1,21 +1,21 @@
-OWNER(
+OWNER(
g:yq
-)
-
-LIBRARY()
-
-SRCS(
- checkpoint_coordinator.cpp
- checkpoint_coordinator.h
- checkpoint_id_generator.cpp
- checkpoint_id_generator.h
- pending_checkpoint.cpp
- pending_checkpoint.h
- utils.cpp
- utils.h
-)
-
-PEERDIR(
+)
+
+LIBRARY()
+
+SRCS(
+ checkpoint_coordinator.cpp
+ checkpoint_coordinator.h
+ checkpoint_id_generator.cpp
+ checkpoint_id_generator.h
+ pending_checkpoint.cpp
+ pending_checkpoint.h
+ utils.cpp
+ utils.h
+)
+
+PEERDIR(
library/cpp/actors/core
ydb/core/yq/libs/actors/logging
ydb/core/yq/libs/checkpointing_common
@@ -24,16 +24,16 @@ PEERDIR(
ydb/library/yql/dq/proto
ydb/library/yql/dq/state
ydb/library/yql/providers/dq/api/protos
-)
-
-YQL_LAST_ABI_VERSION()
-
-END()
-
-RECURSE(
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE(
events
)
RECURSE_FOR_TESTS(
- ut
-)
+ ut
+)
diff --git a/ydb/core/yq/libs/checkpointing_common/defs.cpp b/ydb/core/yq/libs/checkpointing_common/defs.cpp
index e487b6542c..5a8e059657 100644
--- a/ydb/core/yq/libs/checkpointing_common/defs.cpp
+++ b/ydb/core/yq/libs/checkpointing_common/defs.cpp
@@ -14,12 +14,12 @@ TString TCoordinatorId::ToString() const {
void TCoordinatorId::PrintTo(IOutputStream& out) const
{
- out << GraphId << "." << Generation;
+ out << GraphId << "." << Generation;
}
size_t TCheckpointIdHash::operator ()(const TCheckpointId& checkpointId)
{
- return MultiHash(checkpointId.CoordinatorGeneration, checkpointId.SeqNo);
+ return MultiHash(checkpointId.CoordinatorGeneration, checkpointId.SeqNo);
}
} // namespace NYq
@@ -39,5 +39,5 @@ void Out<NYq::TCheckpointId>(
IOutputStream& out,
const NYq::TCheckpointId& checkpointId)
{
- out << checkpointId.CoordinatorGeneration << ":" << checkpointId.SeqNo;
+ out << checkpointId.CoordinatorGeneration << ":" << checkpointId.SeqNo;
}
diff --git a/ydb/core/yq/libs/checkpointing_common/defs.h b/ydb/core/yq/libs/checkpointing_common/defs.h
index c57d1c4ff4..d67b50e62d 100644
--- a/ydb/core/yq/libs/checkpointing_common/defs.h
+++ b/ydb/core/yq/libs/checkpointing_common/defs.h
@@ -27,22 +27,22 @@ using TCoordinators = TVector<TCoordinatorId>;
////////////////////////////////////////////////////////////////////////////////
struct TCheckpointId {
- ui64 CoordinatorGeneration = 0;
- ui64 SeqNo = 0;
+ ui64 CoordinatorGeneration = 0;
+ ui64 SeqNo = 0;
- TCheckpointId(ui64 gen, ui64 SeqNo)
- : CoordinatorGeneration(gen)
- , SeqNo(SeqNo)
+ TCheckpointId(ui64 gen, ui64 SeqNo)
+ : CoordinatorGeneration(gen)
+ , SeqNo(SeqNo)
{
}
bool operator ==(const TCheckpointId& rhs) const {
- return CoordinatorGeneration == rhs.CoordinatorGeneration && SeqNo == rhs.SeqNo;
+ return CoordinatorGeneration == rhs.CoordinatorGeneration && SeqNo == rhs.SeqNo;
}
bool operator <(const TCheckpointId& rhs) const {
- return CoordinatorGeneration < rhs.CoordinatorGeneration ||
- (CoordinatorGeneration == rhs.CoordinatorGeneration && SeqNo < rhs.SeqNo);
+ return CoordinatorGeneration < rhs.CoordinatorGeneration ||
+ (CoordinatorGeneration == rhs.CoordinatorGeneration && SeqNo < rhs.SeqNo);
}
};
@@ -55,7 +55,7 @@ struct TCheckpointIdHash {
////////////////////////////////////////////////////////////////////////////////
enum class ECheckpointStatus {
- Unspecified,
+ Unspecified,
Pending,
PendingCommit,
Completed,
diff --git a/ydb/core/yq/libs/config/protos/yq_config.proto b/ydb/core/yq/libs/config/protos/yq_config.proto
index 841b816821..f0a28cfcb0 100644
--- a/ydb/core/yq/libs/config/protos/yq_config.proto
+++ b/ydb/core/yq/libs/config/protos/yq_config.proto
@@ -29,7 +29,7 @@ message TConfig {
TCommonConfig Common = 2;
TControlPlaneStorageConfig ControlPlaneStorage = 3;
TControlPlaneProxyConfig ControlPlaneProxy = 4;
- NKikimrProto.NFolderService.TFolderServiceConfig FolderService = 5;
+ NKikimrProto.NFolderService.TFolderServiceConfig FolderService = 5;
TPrivateApiConfig PrivateApi = 6;
TTokenAccessorConfig TokenAccessor = 7;
TDbPoolConfig DbPool = 8;
diff --git a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
index 53838351b4..4146ba80f2 100644
--- a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
+++ b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp
@@ -138,11 +138,11 @@ public:
CPP_LOG_T("Request actor. Actor id: " << SelfId());
Become(&TRequestActor::StateFunc, GetDuration(Config.GetRequestTimeout(), TDuration::Seconds(30)), new NActors::TEvents::TEvWakeup());
if constexpr (ResolveFolder) {
- auto request = std::make_unique<NKikimr::NFolderService::TEvFolderService::TEvGetFolderRequest>();
- request->Request.set_folder_id(FolderId);
- request->Token = Token;
+ auto request = std::make_unique<NKikimr::NFolderService::TEvFolderService::TEvGetFolderRequest>();
+ request->Request.set_folder_id(FolderId);
+ request->Token = Token;
ResolveFolderCounters->InFly->Inc();
- Send(NKikimr::NFolderService::FolderServiceActorId(), request.release(), 0, 0);
+ Send(NKikimr::NFolderService::FolderServiceActorId(), request.release(), 0, 0);
} else {
Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, Permissions), 0, Cookie);
}
@@ -151,7 +151,7 @@ public:
STRICT_STFUNC(StateFunc,
cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout);
hFunc(TResponse, Handle);
- hFunc(NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse, Handle);
+ hFunc(NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse, Handle);
)
void HandleTimeout() {
@@ -171,20 +171,20 @@ public:
PassAway();
}
- void Handle(NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse::TPtr& ev) {
+ void Handle(NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse::TPtr& ev) {
ResolveFolderCounters->InFly->Dec();
ResolveFolderCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
- const auto& response = ev->Get()->Response;
- TString errorMessage;
+ const auto& response = ev->Get()->Response;
+ TString errorMessage;
- const auto& status = ev->Get()->Status;
- if (!status.Ok() || !ev->Get()->Response.has_folder()) {
+ const auto& status = ev->Get()->Status;
+ if (!status.Ok() || !ev->Get()->Response.has_folder()) {
ResolveFolderCounters->Error->Inc();
- errorMessage = "Msg: " + status.Msg + " Details: " + status.Details + " Code: " + ToString(status.GRpcStatusCode) + " InternalError: " + ToString(status.InternalError);
- CPP_LOG_E(errorMessage);
+ errorMessage = "Msg: " + status.Msg + " Details: " + status.Details + " Code: " + ToString(status.GRpcStatusCode) + " InternalError: " + ToString(status.InternalError);
+ CPP_LOG_E(errorMessage);
NYql::TIssues issues;
- NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Resolve folder error");
+ NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Resolve folder error");
issues.AddIssue(issue);
Counters->Error->Inc();
Counters->Timeout->Inc();
@@ -196,7 +196,7 @@ public:
}
ResolveFolderCounters->Ok->Inc();
- CloudId = response.folder().cloud_id();
+ CloudId = response.folder().cloud_id();
CPP_LOG_T("Cloud id: " << CloudId << " Folder id: " << FolderId);
if constexpr (ResolveFolder) {
Send(ServiceId, new TRequest(Scope, RequestProto, User, Token, CloudId, Permissions), 0, Cookie);
diff --git a/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp b/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
index 3d0c5bb70b..5116f14855 100644
--- a/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
+++ b/ydb/core/yq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp
@@ -18,7 +18,7 @@
#include <ydb/core/base/path.h>
#include <ydb/library/folder_service/folder_service.h>
-#include <ydb/library/folder_service/mock/mock_folder_service.h>
+#include <ydb/library/folder_service/mock/mock_folder_service.h>
#include <util/system/env.h>
@@ -356,9 +356,9 @@ private:
ControlPlaneProxyActorId(),
TActorSetupCmd(controlPlaneProxy, TMailboxType::Simple, 0));
- auto folderService = NKikimr::NFolderService::CreateMockFolderServiceActor(NKikimrProto::NFolderService::TFolderServiceConfig{});
+ auto folderService = NKikimr::NFolderService::CreateMockFolderServiceActor(NKikimrProto::NFolderService::TFolderServiceConfig{});
runtime->AddLocalService(
- NKikimr::NFolderService::FolderServiceActorId(),
+ NKikimr::NFolderService::FolderServiceActorId(),
TActorSetupCmd(folderService, TMailboxType::Simple, 0),
0
);
diff --git a/ydb/core/yq/libs/control_plane_proxy/ut/ya.make b/ydb/core/yq/libs/control_plane_proxy/ut/ya.make
index 1550392ebc..0171764aa4 100644
--- a/ydb/core/yq/libs/control_plane_proxy/ut/ya.make
+++ b/ydb/core/yq/libs/control_plane_proxy/ut/ya.make
@@ -10,7 +10,7 @@ PEERDIR(
ydb/core/yq/libs/control_plane_storage
ydb/core/yq/libs/test_connection
ydb/library/folder_service
- ydb/library/folder_service/mock
+ ydb/library/folder_service/mock
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp b/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp
index 8274a88589..ec3156870b 100644
--- a/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp
@@ -6,13 +6,13 @@ TRequestCounters::TRequestCounters(const TString& name)
: Name(name) {}
void TRequestCounters::Register(const NMonitoring::TDynamicCounterPtr& counters) {
- auto requestCounters = counters->GetSubgroup("request", Name);
- InFly = requestCounters->GetCounter("InFly", false);
- Ok = requestCounters->GetCounter("Ok", true);
- Error = requestCounters->GetCounter("Error", true);
- Retry = requestCounters->GetCounter("Retry", true);
- LatencyMs = requestCounters->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
- Issues = requestCounters->GetSubgroup("subcomponent", "Issues");
+ auto requestCounters = counters->GetSubgroup("request", Name);
+ InFly = requestCounters->GetCounter("InFly", false);
+ Ok = requestCounters->GetCounter("Ok", true);
+ Error = requestCounters->GetCounter("Error", true);
+ Retry = requestCounters->GetCounter("Retry", true);
+ LatencyMs = requestCounters->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
+ Issues = requestCounters->GetSubgroup("subcomponent", "Issues");
}
NMonitoring::IHistogramCollectorPtr TRequestCounters::GetLatencyHistogramBuckets() {
diff --git a/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.h b/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.h
index 58d3c56b27..095656157d 100644
--- a/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.h
+++ b/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.h
@@ -14,7 +14,7 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr Error;
NMonitoring::TDynamicCounters::TCounterPtr Retry;
NMonitoring::THistogramPtr LatencyMs;
- NMonitoring::TDynamicCounterPtr Issues;
+ NMonitoring::TDynamicCounterPtr Issues;
explicit TRequestCounters(const TString& name);
diff --git a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto
index e6ebff0c84..2bc7aded3b 100644
--- a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto
+++ b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto
@@ -24,7 +24,7 @@ message QueryInternal {
YandexQuery.QueryAction action = 7;
string ast = 8; // deprected and should not be used, will be removed in future
ExecuteMode execute_mode = 9;
- StateLoadMode state_load_mode = 10;
+ StateLoadMode state_load_mode = 10;
string cloud_id = 11;
repeated Yq.Private.TopicConsumer created_topic_consumers = 12;
repeated bytes dq_graph = 13;
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index 222ebb6e8e..75cb62bb8d 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -236,7 +236,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateNodesTable(TActorSystem* as)
.AddNullableColumn(INTERCONNECT_PORT_COLUMN_NAME, EPrimitiveType::Uint32)
.AddNullableColumn(NODE_ADDRESS_COLUMN_NAME, EPrimitiveType::String)
.SetTtlSettings(EXPIRE_AT_COLUMN_NAME)
- .SetPrimaryKeyColumns({TENANT_COLUMN_NAME, NODE_ID_COLUMN_NAME})
+ .SetPrimaryKeyColumns({TENANT_COLUMN_NAME, NODE_ID_COLUMN_NAME})
.Build();
return YdbConnection->Client.RetryOperation(
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index 1c8cda7ee7..0387e3d8cb 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -637,12 +637,12 @@ private:
event->DebugInfo = debugInfo;
actorSystem->Send(new IEventHandle(ev->Sender, self, event.release(), 0, ev->Cookie));
requestCounters->Error->Inc();
- for (const auto& issue : issues) {
- NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) {
- Y_UNUSED(level);
- requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc();
- });
- }
+ for (const auto& issue : issues) {
+ NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) {
+ Y_UNUSED(level);
+ requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc();
+ });
+ }
} else {
CPS_LOG_AS_T(*actorSystem, name << ": " << request.DebugString() << " success");
auto event = std::make_unique<ResponseEvent>(result);
@@ -708,12 +708,12 @@ private:
event->DebugInfo = debugInfo;
actorSystem->Send(new IEventHandle(ev->Sender, self, event.release(), 0, ev->Cookie));
requestCounters->Error->Inc();
- for (const auto& issue : issues) {
- NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) {
- Y_UNUSED(level);
- requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc();
- });
- }
+ for (const auto& issue : issues) {
+ NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) {
+ Y_UNUSED(level);
+ requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc();
+ });
+ }
} else {
CPS_LOG_AS_T(*actorSystem, name << ": " << request.DebugString() << " success");
auto event = std::make_unique<ResponseEvent>(result, auditDetails);
@@ -774,12 +774,12 @@ private:
event->DebugInfo = debugInfo;
actorSystem->Send(new IEventHandle(ev->Sender, self, event.release(), 0, ev->Cookie));
requestCounters->Error->Inc();
- for (const auto& issue : issues) {
- NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) {
- Y_UNUSED(level);
- requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc();
- });
- }
+ for (const auto& issue : issues) {
+ NYql::WalkThroughIssues(issue, true, [&requestCounters](const NYql::TIssue& err, ui16 level) {
+ Y_UNUSED(level);
+ requestCounters->Issues->GetCounter(ToString(err.GetCode()), true)->Inc();
+ });
+ }
} else {
CPS_LOG_AS_T(*actorSystem, name << ": success");
auto event = std::unique_ptr<ResponseEvent>(new ResponseEvent(std::make_from_tuple<ResponseEvent>(result)));
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index 398725f7f4..988c8c3963 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -193,7 +193,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
if (query.ByteSizeLong() > Config.Proto.GetMaxRequestSize()) {
ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "Query data is not placed in the table. Please shorten your request";
}
-
+
if (queryInternal.ByteSizeLong() > Config.Proto.GetMaxRequestSize()) {
ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "Query internal data is not placed in the table. Please reduce the number of connections and bindings";
}
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index d66ac202bf..2f47844e4c 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -102,7 +102,7 @@ void Init(
const TAppData* appData,
const TString& tenant,
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
- const IYqSharedResources::TPtr& iyqSharedResources,
+ const IYqSharedResources::TPtr& iyqSharedResources,
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
const std::function<IActor*(const NYq::NConfig::TAuditConfig& auditConfig)>& auditServiceFactory,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
@@ -145,8 +145,8 @@ void Init(
// if not enabled then stub
{
- auto folderService = folderServiceFactory(protoConfig.GetFolderService());
- actorRegistrator(NKikimr::NFolderService::FolderServiceActorId(), folderService);
+ auto folderService = folderServiceFactory(protoConfig.GetFolderService());
+ actorRegistrator(NKikimr::NFolderService::FolderServiceActorId(), folderService);
}
if (protoConfig.GetCheckpointCoordinator().GetEnabled()) {
diff --git a/ydb/core/yq/libs/init/init.h b/ydb/core/yq/libs/init/init.h
index fa26651f18..d44711f959 100644
--- a/ydb/core/yq/libs/init/init.h
+++ b/ydb/core/yq/libs/init/init.h
@@ -10,7 +10,7 @@
#include <ydb/library/folder_service/proto/config.pb.h>
#include <ydb/core/yq/libs/config/protos/audit.pb.h>
-
+
#include <ydb/library/yql/providers/pq/cm_client/interface/client.h>
#include <library/cpp/actors/core/actor.h>
@@ -33,11 +33,11 @@ void Init(
const NKikimr::TAppData* appData,
const TString& tenant,
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
- const IYqSharedResources::TPtr& yqSharedResources,
+ const IYqSharedResources::TPtr& yqSharedResources,
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
const std::function<IActor*(const NYq::NConfig::TAuditConfig& auditConfig)>& auditServiceFactory,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const ui32& icPort
-);
+);
} // NYq
diff --git a/ydb/core/yq/libs/ya.make b/ydb/core/yq/libs/ya.make
index ccc88a55b6..1b1cd0eed6 100644
--- a/ydb/core/yq/libs/ya.make
+++ b/ydb/core/yq/libs/ya.make
@@ -4,8 +4,8 @@ RECURSE(
actors
audit
checkpoint_storage
- checkpointing
- checkpointing_common
+ checkpointing
+ checkpointing_common
common
config
control_plane_proxy
diff --git a/ydb/library/folder_service/events.h b/ydb/library/folder_service/events.h
index 71329b1d4f..53255a11c8 100644
--- a/ydb/library/folder_service/events.h
+++ b/ydb/library/folder_service/events.h
@@ -1,34 +1,34 @@
-#pragma once
-
+#pragma once
+
#include <ydb/library/folder_service/proto/folder_service.pb.h>
#include <ydb/core/base/events.h>
-
-#include <library/cpp/grpc/client/grpc_client_low.h>
-
-namespace NKikimr::NFolderService {
-
-struct TEvFolderService {
- enum EEv {
- // requests
- EvGetFolderRequest = EventSpaceBegin(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER),
-
- // replies
- EvGetFolderResponse = EventSpaceBegin(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER) + 512,
-
- EvEnd
- };
-
- static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER)");
-
- struct TEvGetFolderRequest : TEventLocal<TEvGetFolderRequest, EvGetFolderRequest> {
- NKikimrProto::NFolderService::GetFolderRequest Request;
- TString Token;
- TString RequestId;
- };
-
- struct TEvGetFolderResponse : TEventLocal<TEvGetFolderResponse, EvGetFolderResponse> {
- NKikimrProto::NFolderService::GetFolderResponse Response;
- NGrpc::TGrpcStatus Status;
- };
-};
-} // namespace NKikimr::NFolderService
+
+#include <library/cpp/grpc/client/grpc_client_low.h>
+
+namespace NKikimr::NFolderService {
+
+struct TEvFolderService {
+ enum EEv {
+ // requests
+ EvGetFolderRequest = EventSpaceBegin(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER),
+
+ // replies
+ EvGetFolderResponse = EventSpaceBegin(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER) + 512,
+
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_FOLDER_SERVICE_ADAPTER)");
+
+ struct TEvGetFolderRequest : TEventLocal<TEvGetFolderRequest, EvGetFolderRequest> {
+ NKikimrProto::NFolderService::GetFolderRequest Request;
+ TString Token;
+ TString RequestId;
+ };
+
+ struct TEvGetFolderResponse : TEventLocal<TEvGetFolderResponse, EvGetFolderResponse> {
+ NKikimrProto::NFolderService::GetFolderResponse Response;
+ NGrpc::TGrpcStatus Status;
+ };
+};
+} // namespace NKikimr::NFolderService
diff --git a/ydb/library/folder_service/folder_service.cpp b/ydb/library/folder_service/folder_service.cpp
index 1189c4005c..b9d770d1fa 100644
--- a/ydb/library/folder_service/folder_service.cpp
+++ b/ydb/library/folder_service/folder_service.cpp
@@ -1,10 +1,10 @@
-#include "folder_service.h"
-
-namespace NKikimr::NFolderService {
-
-NActors::TActorId FolderServiceActorId() {
- constexpr TStringBuf name = "FLDRSRVS";
- return NActors::TActorId(0, name);
-}
-
-} // namespace NKikimr::NFolderService \ No newline at end of file
+#include "folder_service.h"
+
+namespace NKikimr::NFolderService {
+
+NActors::TActorId FolderServiceActorId() {
+ constexpr TStringBuf name = "FLDRSRVS";
+ return NActors::TActorId(0, name);
+}
+
+} // namespace NKikimr::NFolderService \ No newline at end of file
diff --git a/ydb/library/folder_service/folder_service.h b/ydb/library/folder_service/folder_service.h
index 142118c3c9..d1d4cf0305 100644
--- a/ydb/library/folder_service/folder_service.h
+++ b/ydb/library/folder_service/folder_service.h
@@ -1,12 +1,12 @@
-#pragma once
-
+#pragma once
+
#include <ydb/library/folder_service/proto/config.pb.h>
#include <library/cpp/actors/core/actor.h>
-namespace NKikimr::NFolderService {
+namespace NKikimr::NFolderService {
-NActors::TActorId FolderServiceActorId();
+NActors::TActorId FolderServiceActorId();
-NActors::IActor* CreateFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig& config);
+NActors::IActor* CreateFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig& config);
-} // namespace NKikimr::NFolderService
+} // namespace NKikimr::NFolderService
diff --git a/ydb/library/folder_service/mock/mock_folder_service.cpp b/ydb/library/folder_service/mock/mock_folder_service.cpp
index 68f6d1d64d..c08ba6f5d9 100644
--- a/ydb/library/folder_service/mock/mock_folder_service.cpp
+++ b/ydb/library/folder_service/mock/mock_folder_service.cpp
@@ -1,40 +1,40 @@
#include <ydb/library/folder_service/mock/mock_folder_service.h>
#include <ydb/library/folder_service/events.h>
-#include <library/cpp/actors/core/hfunc.h>
-
-namespace NKikimr::NFolderService {
-
-class TFolderServiceMock
- : public NActors::TActor<TFolderServiceMock> {
- using TThis = TFolderServiceMock;
- using TBase = NActors::TActor<TFolderServiceMock>;
-
- using TEvListFolderRequest = NKikimr::NFolderService::TEvFolderService::TEvGetFolderRequest;
- using TEvListFolderResponse = NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse;
-
-public:
- TFolderServiceMock()
- : TBase(&TThis::StateWork) {
- }
-
- void Handle(TEvListFolderRequest::TPtr& ev) {
- auto result = std::make_unique<TEvListFolderResponse>();
- auto* fakeFolder = result->Response.mutable_folder();
- fakeFolder->set_id(ev.Get()->Get()->Request.folder_id());
- fakeFolder->set_cloud_id("mock_cloud");
- result->Status = NGrpc::TGrpcStatus();
- Send(ev->Sender, result.release());
- }
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvListFolderRequest, Handle)
- cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway)
- }
- }
-};
-
-NActors::IActor* CreateMockFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig&) {
- return new TFolderServiceMock();
-}
-} // namespace NKikimr::NFolderService
+#include <library/cpp/actors/core/hfunc.h>
+
+namespace NKikimr::NFolderService {
+
+class TFolderServiceMock
+ : public NActors::TActor<TFolderServiceMock> {
+ using TThis = TFolderServiceMock;
+ using TBase = NActors::TActor<TFolderServiceMock>;
+
+ using TEvListFolderRequest = NKikimr::NFolderService::TEvFolderService::TEvGetFolderRequest;
+ using TEvListFolderResponse = NKikimr::NFolderService::TEvFolderService::TEvGetFolderResponse;
+
+public:
+ TFolderServiceMock()
+ : TBase(&TThis::StateWork) {
+ }
+
+ void Handle(TEvListFolderRequest::TPtr& ev) {
+ auto result = std::make_unique<TEvListFolderResponse>();
+ auto* fakeFolder = result->Response.mutable_folder();
+ fakeFolder->set_id(ev.Get()->Get()->Request.folder_id());
+ fakeFolder->set_cloud_id("mock_cloud");
+ result->Status = NGrpc::TGrpcStatus();
+ Send(ev->Sender, result.release());
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvListFolderRequest, Handle)
+ cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway)
+ }
+ }
+};
+
+NActors::IActor* CreateMockFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig&) {
+ return new TFolderServiceMock();
+}
+} // namespace NKikimr::NFolderService
diff --git a/ydb/library/folder_service/mock/mock_folder_service.h b/ydb/library/folder_service/mock/mock_folder_service.h
index 301ad2f551..9fc55ad95c 100644
--- a/ydb/library/folder_service/mock/mock_folder_service.h
+++ b/ydb/library/folder_service/mock/mock_folder_service.h
@@ -1,10 +1,10 @@
-#pragma once
-
+#pragma once
+
#include <ydb/library/folder_service/proto/config.pb.h>
-
-#include <library/cpp/actors/core/actor.h>
-
-namespace NKikimr::NFolderService {
-
-NActors::IActor* CreateMockFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig& config);
-}
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NKikimr::NFolderService {
+
+NActors::IActor* CreateMockFolderServiceActor(const NKikimrProto::NFolderService::TFolderServiceConfig& config);
+}
diff --git a/ydb/library/folder_service/mock/ya.make b/ydb/library/folder_service/mock/ya.make
index b502c4306a..bd42486475 100644
--- a/ydb/library/folder_service/mock/ya.make
+++ b/ydb/library/folder_service/mock/ya.make
@@ -1,17 +1,17 @@
OWNER(g:yq)
-
-LIBRARY()
-
-SRCS(
- mock_folder_service.cpp
-)
-
-PEERDIR(
- library/cpp/actors/core
+
+LIBRARY()
+
+SRCS(
+ mock_folder_service.cpp
+)
+
+PEERDIR(
+ library/cpp/actors/core
ydb/library/folder_service
ydb/library/folder_service/proto
-)
-
-YQL_LAST_ABI_VERSION()
-
-END()
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/library/folder_service/proto/config.proto b/ydb/library/folder_service/proto/config.proto
index 06981810a3..94fadb0f41 100644
--- a/ydb/library/folder_service/proto/config.proto
+++ b/ydb/library/folder_service/proto/config.proto
@@ -1,9 +1,9 @@
-syntax = "proto3";
-
-package NKikimrProto.NFolderService;
-
-message TFolderServiceConfig {
- bool Enable = 1;
- string Endpoint = 2;
- string PathToRootCA = 3;
-}
+syntax = "proto3";
+
+package NKikimrProto.NFolderService;
+
+message TFolderServiceConfig {
+ bool Enable = 1;
+ string Endpoint = 2;
+ string PathToRootCA = 3;
+}
diff --git a/ydb/library/folder_service/proto/folder_service.proto b/ydb/library/folder_service/proto/folder_service.proto
index bb4fb4c2a6..39f77f34ca 100644
--- a/ydb/library/folder_service/proto/folder_service.proto
+++ b/ydb/library/folder_service/proto/folder_service.proto
@@ -1,36 +1,36 @@
-syntax = "proto3";
-
-import "google/protobuf/timestamp.proto";
-
-package NKikimrProto.NFolderService;
-
-// copied from cloud/bitbucket/private-api/yandex/cloud/priv/resourcemanager/v1/folder_service.proto
-
-message Folder {
-
- enum Status {
- STATUS_UNSPECIFIED = 0;
- ACTIVE = 1;
- DELETING = 2;
- DELETED = 3;
- PENDING_DELETION = 4;
- }
-
- string id = 1;
- string cloud_id = 2;
-
- google.protobuf.Timestamp created_at = 3;
- string name = 4;
- string description = 5;
- map<string, string> labels = 6;
-
- Status status = 7;
-}
-
-message GetFolderRequest {
- string folder_id = 1;
-}
-
-message GetFolderResponse {
- Folder folder = 1;
-}
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+
+package NKikimrProto.NFolderService;
+
+// copied from cloud/bitbucket/private-api/yandex/cloud/priv/resourcemanager/v1/folder_service.proto
+
+message Folder {
+
+ enum Status {
+ STATUS_UNSPECIFIED = 0;
+ ACTIVE = 1;
+ DELETING = 2;
+ DELETED = 3;
+ PENDING_DELETION = 4;
+ }
+
+ string id = 1;
+ string cloud_id = 2;
+
+ google.protobuf.Timestamp created_at = 3;
+ string name = 4;
+ string description = 5;
+ map<string, string> labels = 6;
+
+ Status status = 7;
+}
+
+message GetFolderRequest {
+ string folder_id = 1;
+}
+
+message GetFolderResponse {
+ Folder folder = 1;
+}
diff --git a/ydb/library/folder_service/proto/ya.make b/ydb/library/folder_service/proto/ya.make
index 1b1c5dc6bb..154c56f8a7 100644
--- a/ydb/library/folder_service/proto/ya.make
+++ b/ydb/library/folder_service/proto/ya.make
@@ -1,12 +1,12 @@
OWNER(g:yq)
-
-PROTO_LIBRARY()
-
-SRCS(
- config.proto
- folder_service.proto
-)
-
-EXCLUDE_TAGS(GO_PROTO)
-
-END()
+
+PROTO_LIBRARY()
+
+SRCS(
+ config.proto
+ folder_service.proto
+)
+
+EXCLUDE_TAGS(GO_PROTO)
+
+END()
diff --git a/ydb/library/folder_service/ya.make b/ydb/library/folder_service/ya.make
index 5ca60e0271..897e56bf49 100644
--- a/ydb/library/folder_service/ya.make
+++ b/ydb/library/folder_service/ya.make
@@ -1,22 +1,22 @@
OWNER(g:yq)
-
-LIBRARY()
-
-SRCS(
+
+LIBRARY()
+
+SRCS(
events.h
- folder_service.cpp
+ folder_service.cpp
folder_service.h
-)
-
-PEERDIR(
- library/cpp/actors/core
+)
+
+PEERDIR(
+ library/cpp/actors/core
ydb/core/base
ydb/library/folder_service/proto
-)
-
-END()
-
-RECURSE(
- mock
- proto
-)
+)
+
+END()
+
+RECURSE(
+ mock
+ proto
+)
diff --git a/ydb/library/yql/dq/actors/compute/dq_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_checkpoints.h
index e6775602a4..de7c2cea33 100644
--- a/ydb/library/yql/dq/actors/compute/dq_checkpoints.h
+++ b/ydb/library/yql/dq/actors/compute/dq_checkpoints.h
@@ -1,14 +1,14 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/actors/core/actorid.h>
namespace NYql {
namespace NDq {
-inline static NActors::TActorId MakeCheckpointStorageID() {
- const char name[12] = "cp_storage";
- return NActors::TActorId(0, TStringBuf(name, 12));
-}
+inline static NActors::TActorId MakeCheckpointStorageID() {
+ const char name[12] = "cp_storage";
+ return NActors::TActorId(0, TStringBuf(name, 12));
+}
} // namespace NDq
} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index 84eeda2edb..3eaf2d1246 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -54,20 +54,20 @@ struct TEvDqCompute {
const ui64 FromSeqNo;
const ui64 ToSeqNo;
};
-
+
struct TEvRun : public NActors::TEventPB<TEvRun, NDqProto::TEvRun, TDqComputeEvents::EvRun> {};
-
+
struct TEvNewCheckpointCoordinator : public NActors::TEventPB<TEvNewCheckpointCoordinator,
- NDqProto::TEvNewCheckpointCoordinator, TDqComputeEvents::EvNewCheckpointCoordinator> {
-
- TEvNewCheckpointCoordinator() = default;
-
- TEvNewCheckpointCoordinator(ui64 generation, TString graphId) {
- Record.SetGeneration(generation);
- Record.SetGraphId(std::move(graphId));
- }
- };
-
+ NDqProto::TEvNewCheckpointCoordinator, TDqComputeEvents::EvNewCheckpointCoordinator> {
+
+ TEvNewCheckpointCoordinator() = default;
+
+ TEvNewCheckpointCoordinator(ui64 generation, TString graphId) {
+ Record.SetGeneration(generation);
+ Record.SetGraphId(std::move(graphId));
+ }
+ };
+
struct TEvNewCheckpointCoordinatorAck : public NActors::TEventPB<TEvNewCheckpointCoordinatorAck,
NDqProto::TEvNewCheckpointCoordinatorAck, TDqComputeEvents::EvNewCheckpointCoordinatorAck> {
@@ -75,63 +75,63 @@ struct TEvDqCompute {
};
struct TEvInjectCheckpoint : public NActors::TEventPB<TEvInjectCheckpoint,
- NDqProto::TEvInjectCheckpoint, TDqComputeEvents::EvInjectCheckpoint> {
-
- TEvInjectCheckpoint() = default;
-
- TEvInjectCheckpoint(ui64 id, ui64 generation) {
- Record.MutableCheckpoint()->SetId(id);
- Record.MutableCheckpoint()->SetGeneration(generation);
- Record.SetGeneration(generation);
- }
- };
-
+ NDqProto::TEvInjectCheckpoint, TDqComputeEvents::EvInjectCheckpoint> {
+
+ TEvInjectCheckpoint() = default;
+
+ TEvInjectCheckpoint(ui64 id, ui64 generation) {
+ Record.MutableCheckpoint()->SetId(id);
+ Record.MutableCheckpoint()->SetGeneration(generation);
+ Record.SetGeneration(generation);
+ }
+ };
+
struct TEvSaveTaskState : public NActors::TEventLocal<TEvSaveTaskState, TDqComputeEvents::EvSaveTaskState> {
TEvSaveTaskState(TString graphId, ui64 taskId, NDqProto::TCheckpoint checkpoint)
- : GraphId(std::move(graphId))
- , TaskId(taskId)
- , Checkpoint(std::move(checkpoint))
+ : GraphId(std::move(graphId))
+ , TaskId(taskId)
+ , Checkpoint(std::move(checkpoint))
{}
-
- const TString GraphId;
- const ui64 TaskId;
+
+ const TString GraphId;
+ const ui64 TaskId;
const NDqProto::TCheckpoint Checkpoint;
NDqProto::TComputeActorState State;
- };
-
+ };
+
struct TEvSaveTaskStateResult : public NActors::TEventPB<TEvSaveTaskStateResult,
NDqProto::TEvSaveTaskStateResult, TDqComputeEvents::EvSaveTaskStateResult> {};
-
+
struct TEvCommitState : public NActors::TEventPB<TEvCommitState,
- NDqProto::TEvCommitState, TDqComputeEvents::EvCommitState> {
-
- TEvCommitState() = default;
-
- TEvCommitState(ui64 checkpointId, ui64 checkpointGeneration, ui64 coordinatorGeneration) {
- Record.MutableCheckpoint()->SetId(checkpointId);
- Record.MutableCheckpoint()->SetGeneration(checkpointGeneration);
- Record.SetGeneration(coordinatorGeneration);
- }
- };
-
+ NDqProto::TEvCommitState, TDqComputeEvents::EvCommitState> {
+
+ TEvCommitState() = default;
+
+ TEvCommitState(ui64 checkpointId, ui64 checkpointGeneration, ui64 coordinatorGeneration) {
+ Record.MutableCheckpoint()->SetId(checkpointId);
+ Record.MutableCheckpoint()->SetGeneration(checkpointGeneration);
+ Record.SetGeneration(coordinatorGeneration);
+ }
+ };
+
struct TEvStateCommitted : public NActors::TEventPB<TEvStateCommitted,
- NDqProto::TEvStateCommitted, TDqComputeEvents::EvStateCommitted> {
-
- TEvStateCommitted() = default;
-
- TEvStateCommitted(ui64 id, ui64 generation, ui64 taskId) {
- Record.MutableCheckpoint()->SetId(id);
- Record.MutableCheckpoint()->SetGeneration(generation);
- Record.SetTaskId(taskId);
- }
- };
-
+ NDqProto::TEvStateCommitted, TDqComputeEvents::EvStateCommitted> {
+
+ TEvStateCommitted() = default;
+
+ TEvStateCommitted(ui64 id, ui64 generation, ui64 taskId) {
+ Record.MutableCheckpoint()->SetId(id);
+ Record.MutableCheckpoint()->SetGeneration(generation);
+ Record.SetTaskId(taskId);
+ }
+ };
+
struct TEvRestoreFromCheckpoint : public NActors::TEventPB<TEvRestoreFromCheckpoint,
- NDqProto::TEvRestoreFromCheckpoint, TDqComputeEvents::EvRestoreFromCheckpoint> {
-
- TEvRestoreFromCheckpoint() = default;
-
- TEvRestoreFromCheckpoint(ui64 checkpointId, ui64 checkpointGeneration, ui64 coordinatorGeneration) {
+ NDqProto::TEvRestoreFromCheckpoint, TDqComputeEvents::EvRestoreFromCheckpoint> {
+
+ TEvRestoreFromCheckpoint() = default;
+
+ TEvRestoreFromCheckpoint(ui64 checkpointId, ui64 checkpointGeneration, ui64 coordinatorGeneration) {
Init(checkpointId, checkpointGeneration, coordinatorGeneration);
Record.MutableStateLoadPlan()->SetStateType(NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN); // default
}
@@ -143,16 +143,16 @@ struct TEvDqCompute {
private:
void Init(ui64 checkpointId, ui64 checkpointGeneration, ui64 coordinatorGeneration) {
- Record.MutableCheckpoint()->SetId(checkpointId);
- Record.MutableCheckpoint()->SetGeneration(checkpointGeneration);
- Record.SetGeneration(coordinatorGeneration);
- }
- };
-
+ Record.MutableCheckpoint()->SetId(checkpointId);
+ Record.MutableCheckpoint()->SetGeneration(checkpointGeneration);
+ Record.SetGeneration(coordinatorGeneration);
+ }
+ };
+
struct TEvRestoreFromCheckpointResult : public NActors::TEventPB<TEvRestoreFromCheckpointResult,
NDqProto::TEvRestoreFromCheckpointResult, TDqComputeEvents::EvRestoreFromCheckpointResult> {
using TBaseEventPB = NActors::TEventPB<TEvRestoreFromCheckpointResult, NDqProto::TEvRestoreFromCheckpointResult, TDqComputeEvents::EvRestoreFromCheckpointResult>;
-
+
using TBaseEventPB::TBaseEventPB;
TEvRestoreFromCheckpointResult(const NDqProto::TCheckpoint& checkpoint, ui64 taskId, NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status) {
@@ -164,28 +164,28 @@ struct TEvDqCompute {
struct TEvGetTaskState : public NActors::TEventLocal<TEvGetTaskState, TDqComputeEvents::EvGetTaskState> {
TEvGetTaskState(TString graphId, const std::vector<ui64>& taskIds, NDqProto::TCheckpoint checkpoint, ui64 generation)
- : GraphId(std::move(graphId))
+ : GraphId(std::move(graphId))
, TaskIds(taskIds)
- , Checkpoint(std::move(checkpoint))
+ , Checkpoint(std::move(checkpoint))
, Generation(generation) {}
-
- const TString GraphId;
+
+ const TString GraphId;
const std::vector<ui64> TaskIds;
const NDqProto::TCheckpoint Checkpoint;
- const ui64 Generation;
- };
-
+ const ui64 Generation;
+ };
+
struct TEvGetTaskStateResult : public NActors::TEventLocal<TEvGetTaskStateResult, TDqComputeEvents::EvGetTaskStateResult> {
TEvGetTaskStateResult(NDqProto::TCheckpoint checkpoint, TIssues issues, ui64 generation)
- : Checkpoint(std::move(checkpoint))
- , Issues(std::move(issues))
+ : Checkpoint(std::move(checkpoint))
+ , Issues(std::move(issues))
, Generation(generation) {}
-
+
const NDqProto::TCheckpoint Checkpoint;
std::vector<NDqProto::TComputeActorState> States;
const TIssues Issues;
- const ui64 Generation;
- };
+ const ui64 Generation;
+ };
};
struct TDqExecutionSettings {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
index 9d943cbca3..c5368313b8 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
@@ -1,37 +1,37 @@
-#include "dq_compute_actor_checkpoints.h"
-#include "dq_checkpoints.h"
+#include "dq_compute_actor_checkpoints.h"
+#include "dq_checkpoints.h"
#include "dq_compute_actor_impl.h"
-
+
#include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h>
#include <algorithm>
-#define LOG_D(s) \
+#define LOG_D(s) \
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s)
-#define LOG_I(s) \
+#define LOG_I(s) \
LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s)
-#define LOG_W(s) \
+#define LOG_W(s) \
LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s)
-#define LOG_E(s) \
+#define LOG_E(s) \
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s)
-
-#define LOG_CP_D(s) \
+
+#define LOG_CP_D(s) \
LOG_D("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
-#define LOG_CP_I(s) \
+#define LOG_CP_I(s) \
LOG_I("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
-#define LOG_CP_E(s) \
+#define LOG_CP_E(s) \
LOG_E("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
-
+
namespace NYql::NDq {
-
+
using namespace NActors;
namespace {
TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) {
return TStringBuilder() << checkpoint.GetGeneration() << "." << checkpoint.GetId();
-}
-
+}
+
bool IsIngressTask(const NDqProto::TDqTask& task) {
for (const auto& input : task.GetInputs()) {
if (!input.HasSource()) {
@@ -106,16 +106,16 @@ NDqProto::TComputeActorState CombineForeignState(
} // namespace
TDqComputeActorCheckpoints::TDqComputeActorCheckpoints(const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor)
- : TActor(&TDqComputeActorCheckpoints::StateFunc)
+ : TActor(&TDqComputeActorCheckpoints::StateFunc)
, TxId(txId)
- , Task(std::move(task))
+ , Task(std::move(task))
, IngressTask(IsIngressTask(Task))
- , CheckpointStorage(MakeCheckpointStorageID())
+ , CheckpointStorage(MakeCheckpointStorageID())
, ComputeActor(computeActor)
, PendingCheckpoint(Task)
{
-}
-
+}
+
void TDqComputeActorCheckpoints::Init(NActors::TActorId computeActorId, NActors::TActorId checkpointsId) {
EventsQueue.Init(TxId, computeActorId, checkpointsId);
}
@@ -133,28 +133,28 @@ STRICT_STFUNC(TDqComputeActorCheckpoints::StateFunc,
hFunc(TEvRetryQueuePrivate::TEvRetry, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
)
-
+
namespace {
// Get generation for protobuf event.
template <class E>
auto GetGeneration(const E& ev) -> decltype(ev->Get()->Record.GetGeneration()) {
return ev->Get()->Record.GetGeneration();
-}
-
+}
+
// Get generation for local event.
-template <class E>
+template <class E>
auto GetGeneration(const E& ev) -> decltype(ev->Get()->Generation) {
return ev->Get()->Generation;
}
-
+
ui64 GetGeneration(const TEvDqCompute::TEvSaveTaskStateResult::TPtr& ev) {
return ev->Get()->Record.GetCheckpoint().GetGeneration();
-}
-
+}
+
} // anonymous namespace
-template <class E>
+template <class E>
bool TDqComputeActorCheckpoints::ShouldIgnoreOldCoordinator(const E& ev, bool verifyOnGenerationFromFuture) {
const ui64 generation = GetGeneration(ev);
Y_VERIFY(!verifyOnGenerationFromFuture || !CheckpointCoordinator || generation <= CheckpointCoordinator->Generation,
@@ -165,72 +165,72 @@ bool TDqComputeActorCheckpoints::ShouldIgnoreOldCoordinator(const E& ev, bool ve
return true;
}
return false;
-}
-
+}
+
void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinator::TPtr& ev) {
if (ShouldIgnoreOldCoordinator(ev, false)) {
return;
}
const ui64 newGeneration = ev->Get()->Record.GetGeneration();
- LOG_I("Got TEvNewCheckpointCoordinator event: generation " << newGeneration << ", actorId: " << ev->Sender);
-
+ LOG_I("Got TEvNewCheckpointCoordinator event: generation " << newGeneration << ", actorId: " << ev->Sender);
+
if (CheckpointCoordinator && CheckpointCoordinator->Generation == newGeneration) { // The same message. It was retry from coordinator.
- Y_VERIFY(CheckpointCoordinator->ActorId == ev->Sender, "there shouldn't be two different checkpoint coordinators with the same generation");
+ Y_VERIFY(CheckpointCoordinator->ActorId == ev->Sender, "there shouldn't be two different checkpoint coordinators with the same generation");
Y_VERIFY(GraphId == ev->Get()->Record.GetGraphId());
- return;
- }
-
+ return;
+ }
+
if (CheckpointCoordinator) {
LOG_I("Replace stale checkpoint coordinator (generation = " << CheckpointCoordinator->Generation << ") with a new one");
} else {
LOG_I("Assign checkpoint coordinator (generation = " << newGeneration << ")");
}
- CheckpointCoordinator = TCheckpointCoordinatorId(ev->Sender, newGeneration);
+ CheckpointCoordinator = TCheckpointCoordinatorId(ev->Sender, newGeneration);
GraphId = ev->Get()->Record.GetGraphId();
-
+
EventsQueue.OnNewRecipientId(ev->Sender);
Y_VERIFY(EventsQueue.OnEventReceived(ev->Get()));
EventsQueue.Send(new TEvDqCompute::TEvNewCheckpointCoordinatorAck());
- if (PendingCheckpoint) {
- LOG_I("Drop pending checkpoint since coordinator is stale");
- PendingCheckpoint.Clear();
- ComputeActor->ResumeInputs();
- }
-}
-
+ if (PendingCheckpoint) {
+ LOG_I("Drop pending checkpoint since coordinator is stale");
+ PendingCheckpoint.Clear();
+ ComputeActor->ResumeInputs();
+ }
+}
+
void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvInjectCheckpoint::TPtr& ev) {
if (ShouldIgnoreOldCoordinator(ev)) {
return;
}
YQL_ENSURE(IngressTask, "Shouldn't inject barriers into non-ingress tasks");
- YQL_ENSURE(!PendingCheckpoint);
-
- PendingCheckpoint = ev->Get()->Record.GetCheckpoint();
- LOG_CP_I("Got TEvInjectCheckpoint");
- ComputeActor->ResumeExecution();
-}
-
+ YQL_ENSURE(!PendingCheckpoint);
+
+ PendingCheckpoint = ev->Get()->Record.GetCheckpoint();
+ LOG_CP_I("Got TEvInjectCheckpoint");
+ ComputeActor->ResumeExecution();
+}
+
void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvSaveTaskStateResult::TPtr& ev) {
if (ShouldIgnoreOldCoordinator(ev)) {
- return;
- }
+ return;
+ }
EventsQueue.Send(ev->Release().Release(), ev->Cookie);
-}
-
+}
+
void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::TPtr& ev) {
if (ShouldIgnoreOldCoordinator(ev)) {
- return;
- }
-
+ return;
+ }
+
if (!EventsQueue.OnEventReceived(ev)) {
return;
}
- ComputeActor->Stop();
+ ComputeActor->Stop();
TaskLoadPlan = ev->Get()->Record.GetStateLoadPlan();
const auto& checkpoint = ev->Get()->Record.GetCheckpoint();
LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Got TEvRestoreFromCheckpoint event with plan " << TaskLoadPlan);
@@ -271,38 +271,38 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
break;
}
}
-}
-
+}
+
void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPtr& ev) {
if (ShouldIgnoreOldCoordinator(ev)) {
- return;
- }
-
- auto& checkpoint = ev->Get()->Checkpoint;
+ return;
+ }
+
+ auto& checkpoint = ev->Get()->Checkpoint;
std::vector<ui64> taskIds;
size_t taskIdsSize = 1;
if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) {
taskIds = TaskIdsFromLoadPlan(TaskLoadPlan);
taskIdsSize = taskIds.size();
}
-
+
if (!ev->Get()->Issues.Empty()) {
LOG_E("[Checkpoint " << MakeStringForLog(checkpoint)
<< "] Can't get state from storage: " << ev->Get()->Issues.ToString());
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);
return;
}
-
+
if (ev->Get()->States.size() != taskIdsSize) {
LOG_E("[Checkpoint " << MakeStringForLog(checkpoint)
<< "] Got unexpected states count. States count: " << ev->Get()->States.size()
<< ". Expected states count: " << taskIdsSize);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);
- return;
- }
-
+ return;
+ }
+
LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Got TEvGetTaskStateResult event, restoring state");
- try {
+ try {
if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN) {
ComputeActor->LoadState(ev->Get()->States[0]);
} else if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) {
@@ -313,41 +313,41 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt
NDqProto::NDqStateLoadPlan::EStateType_Name(TaskLoadPlan.GetStateType()).c_str(),
static_cast<int>(TaskLoadPlan.GetStateType()));
}
- } catch (const std::exception& e) {
+ } catch (const std::exception& e) {
LOG_E("[Checkpoint " << MakeStringForLog(checkpoint) << "] Failed to load state: " << e.what());
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), ev->Cookie);
LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Checkpoint state restoration aborted");
- return;
- }
-
+ return;
+ }
+
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), ev->Cookie);
LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Checkpoint state restored");
-}
-
+}
+
void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRun::TPtr& ev) {
EventsQueue.OnEventReceived(ev);
}
void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvCommitState::TPtr& ev) {
if (ShouldIgnoreOldCoordinator(ev)) {
- return;
- }
-
+ return;
+ }
+
if (!EventsQueue.OnEventReceived(ev)) {
return;
}
- // No actual commit at the moment: will be done in further commits
- auto checkpoint = ev->Get()->Record.GetCheckpoint();
+ // No actual commit at the moment: will be done in further commits
+ auto checkpoint = ev->Get()->Record.GetCheckpoint();
ComputeActor->CommitState(checkpoint);
EventsQueue.Send(new TEvDqCompute::TEvStateCommitted(checkpoint.GetId(), checkpoint.GetGeneration(), Task.GetId()), ev->Cookie);
-}
-
-void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvPoison::TPtr&) {
- LOG_D("pass away");
- PassAway();
-}
-
+}
+
+void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvPoison::TPtr&) {
+ LOG_D("pass away");
+ PassAway();
+}
+
void TDqComputeActorCheckpoints::Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
LOG_I("Handle disconnected node " << ev->Get()->NodeId);
EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId);
@@ -363,62 +363,62 @@ void TDqComputeActorCheckpoints::Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev
EventsQueue.Retry();
}
-bool TDqComputeActorCheckpoints::HasPendingCheckpoint() const {
+bool TDqComputeActorCheckpoints::HasPendingCheckpoint() const {
return PendingCheckpoint;
-}
-
+}
+
bool TDqComputeActorCheckpoints::ComputeActorStateSaved() const {
return PendingCheckpoint && PendingCheckpoint.SavedComputeActorState;
}
-void TDqComputeActorCheckpoints::DoCheckpoint() {
- Y_VERIFY(CheckpointCoordinator);
+void TDqComputeActorCheckpoints::DoCheckpoint() {
+ Y_VERIFY(CheckpointCoordinator);
Y_VERIFY(PendingCheckpoint);
-
- LOG_CP_I("Performing task checkpoint");
- if (SaveState()) {
- LOG_CP_D("Injecting checkpoint barrier to outputs");
+
+ LOG_CP_I("Performing task checkpoint");
+ if (SaveState()) {
+ LOG_CP_D("Injecting checkpoint barrier to outputs");
ComputeActor->InjectBarrierToOutputs(*PendingCheckpoint.Checkpoint);
TryToSavePendingCheckpoint();
- }
-}
-
-[[nodiscard]]
-bool TDqComputeActorCheckpoints::SaveState() {
- LOG_CP_D("Saving task state");
-
- try {
+ }
+}
+
+[[nodiscard]]
+bool TDqComputeActorCheckpoints::SaveState() {
+ LOG_CP_D("Saving task state");
+
+ try {
Y_VERIFY(!PendingCheckpoint.SavedComputeActorState);
PendingCheckpoint.SavedComputeActorState = true;
ComputeActor->SaveState(*PendingCheckpoint.Checkpoint, PendingCheckpoint.ComputeActorState);
- } catch (const std::exception& e) {
+ } catch (const std::exception& e) {
PendingCheckpoint.Clear();
- LOG_CP_E("Failed to save state: " << e.what());
-
+ LOG_CP_E("Failed to save state: " << e.what());
+
auto resultEv = MakeHolder<TEvDqCompute::TEvSaveTaskStateResult>();
resultEv->Record.MutableCheckpoint()->CopyFrom(*PendingCheckpoint.Checkpoint);
- resultEv->Record.SetTaskId(Task.GetId());
+ resultEv->Record.SetTaskId(Task.GetId());
resultEv->Record.SetStatus(NDqProto::TEvSaveTaskStateResult::INTERNAL_ERROR);
EventsQueue.Send(std::move(resultEv));
-
- return false;
- }
-
+
+ return false;
+ }
+
LOG_CP_D("Compute actor state saved");
- return true;
-}
-
+ return true;
+}
+
void TDqComputeActorCheckpoints::RegisterCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId) {
- if (!PendingCheckpoint) {
- PendingCheckpoint = checkpoint;
- } else {
+ if (!PendingCheckpoint) {
+ PendingCheckpoint = checkpoint;
+ } else {
YQL_ENSURE(PendingCheckpoint.Checkpoint->GetGeneration() == checkpoint.GetGeneration());
YQL_ENSURE(PendingCheckpoint.Checkpoint->GetId() == checkpoint.GetId());
- }
- LOG_CP_I("Got checkpoint barrier from channel " << channelId);
+ }
+ LOG_CP_I("Got checkpoint barrier from channel " << channelId);
ComputeActor->ResumeExecution();
-}
-
+}
+
void TDqComputeActorCheckpoints::OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) {
Y_VERIFY(CheckpointCoordinator);
Y_VERIFY(checkpoint.GetGeneration() <= CheckpointCoordinator->Generation);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
index 27fd851338..974a3089e2 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
@@ -1,13 +1,13 @@
-#pragma once
-
-#include "dq_compute_actor.h"
+#pragma once
+
+#include "dq_compute_actor.h"
#include "dq_compute_actor_sinks.h"
#include "retry_queue.h"
-
+
#include <ydb/library/yql/dq/common/dq_common.h>
-#include <library/cpp/actors/core/log.h>
-
+#include <library/cpp/actors/core/log.h>
+
#include <util/generic/ptr.h>
#include <algorithm>
@@ -19,21 +19,21 @@ enum ECheckpointingMode;
} // namespace NYql::NDqProto
namespace NYql::NDq {
-
+
NDqProto::ECheckpointingMode GetTaskCheckpointingMode(const NDqProto::TDqTask& task);
class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpoints>
{
- struct TCheckpointCoordinatorId {
+ struct TCheckpointCoordinatorId {
NActors::TActorId ActorId;
- ui64 Generation;
-
+ ui64 Generation;
+
TCheckpointCoordinatorId(NActors::TActorId actorId, ui64 generation)
- : ActorId(actorId)
- , Generation(generation) {
- }
- };
-
+ : ActorId(actorId)
+ , Generation(generation) {
+ }
+ };
+
struct TPendingCheckpoint {
TPendingCheckpoint(const NDqProto::TDqTask& task)
: SinksCount(GetSinksCount(task))
@@ -63,42 +63,42 @@ class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpo
bool SavedComputeActorState = false;
};
-public:
+public:
static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR_CHECKPOINTS";
- struct ICallbacks {
- [[nodiscard]]
- virtual bool ReadyToCheckpoint() const = 0;
+ struct ICallbacks {
+ [[nodiscard]]
+ virtual bool ReadyToCheckpoint() const = 0;
virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const = 0;
virtual void CommitState(const NDqProto::TCheckpoint& checkpoint) = 0;
virtual void InjectBarrierToOutputs(const NDqProto::TCheckpoint& checkpoint) = 0;
- virtual void ResumeInputs() = 0;
-
- virtual void Start() = 0;
- virtual void Stop() = 0;
- virtual void ResumeExecution() = 0;
-
+ virtual void ResumeInputs() = 0;
+
+ virtual void Start() = 0;
+ virtual void Stop() = 0;
+ virtual void ResumeExecution() = 0;
+
virtual void LoadState(const NDqProto::TComputeActorState& state) = 0;
-
+
virtual ~ICallbacks() = default;
- };
-
+ };
+
TDqComputeActorCheckpoints(const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor);
void Init(NActors::TActorId computeActorId, NActors::TActorId checkpointsId);
- [[nodiscard]]
- bool HasPendingCheckpoint() const;
+ [[nodiscard]]
+ bool HasPendingCheckpoint() const;
bool ComputeActorStateSaved() const;
- void DoCheckpoint();
- bool SaveState();
+ void DoCheckpoint();
+ bool SaveState();
void RegisterCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId);
-
+
// Sink actor support.
void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint);
void TryToSavePendingCheckpoint();
-private:
- STATEFN(StateFunc);
+private:
+ STATEFN(StateFunc);
void Handle(TEvDqCompute::TEvNewCheckpointCoordinator::TPtr&);
void Handle(TEvDqCompute::TEvInjectCheckpoint::TPtr&);
void Handle(TEvDqCompute::TEvSaveTaskStateResult::TPtr&);
@@ -106,33 +106,33 @@ private:
void Handle(TEvDqCompute::TEvRestoreFromCheckpoint::TPtr&);
void Handle(TEvDqCompute::TEvGetTaskStateResult::TPtr&);
void Handle(TEvDqCompute::TEvRun::TPtr& ev);
- void Handle(NActors::TEvents::TEvPoison::TPtr&);
+ void Handle(NActors::TEvents::TEvPoison::TPtr&);
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev);
-
+
void PassAway() override;
// Validates generation and returns true if it is from old coordinator.
- template <class E>
+ template <class E>
bool ShouldIgnoreOldCoordinator(const E& ev, bool verifyOnGenerationFromFuture = true);
-
-private:
+
+private:
const TTxId TxId;
const NDqProto::TDqTask Task;
const bool IngressTask;
-
+
const NActors::TActorId CheckpointStorage;
- TString GraphId;
-
+ TString GraphId;
+
ICallbacks* ComputeActor = nullptr;
-
- TMaybe<TCheckpointCoordinatorId> CheckpointCoordinator;
+
+ TMaybe<TCheckpointCoordinatorId> CheckpointCoordinator;
TPendingCheckpoint PendingCheckpoint;
TRetryEventsQueue EventsQueue;
// Restore
NYql::NDqProto::NDqStateLoadPlan::TTaskPlan TaskLoadPlan;
-};
-
+};
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index deaadefb42..c9dfd4314a 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -2,7 +2,7 @@
#include "dq_compute_actor.h"
#include "dq_compute_actor_channels.h"
-#include "dq_compute_actor_checkpoints.h"
+#include "dq_compute_actor_checkpoints.h"
#include "dq_compute_actor_sinks.h"
#include "dq_compute_actor_sources.h"
#include "dq_compute_issues_buffer.h"
@@ -59,7 +59,7 @@ constexpr ui32 IssuesBufferSize = 16;
template<typename TDerived>
class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
, public TDqComputeActorChannels::ICallbacks
- , public TDqComputeActorCheckpoints::ICallbacks
+ , public TDqComputeActorCheckpoints::ICallbacks
, public IDqSourceActor::ICallbacks
, public IDqSinkActor::ICallbacks
{
@@ -267,9 +267,9 @@ protected:
}
if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {
- Checkpoints->DoCheckpoint();
- }
-
+ Checkpoints->DoCheckpoint();
+ }
+
ProcessOutputsImpl(status);
}
@@ -400,12 +400,12 @@ protected:
Channels->Receive(handle, NActors::TActivationContext::AsActorContext());
}
- if (Checkpoints) {
+ if (Checkpoints) {
TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), this->SelfId(),
new NActors::TEvents::TEvPoison);
Checkpoints->Receive(handle, NActors::TActivationContext::AsActorContext());
- }
-
+ }
+
for (auto& [_, source] : SourcesMap) {
if (source.Actor) {
source.SourceActor->PassAway();
@@ -497,7 +497,7 @@ protected:
}
void ContinueExecute() {
- if (!ResumeEventScheduled && Running) {
+ if (!ResumeEventScheduled && Running) {
ResumeEventScheduled = true;
this->Send(this->SelfId(), new TEvDqCompute::TEvResumeExecution());
}
@@ -522,14 +522,14 @@ public:
channel->Push(std::move(*channelData.MutableData()));
}
- if (channelData.HasCheckpoint()) {
+ if (channelData.HasCheckpoint()) {
Y_VERIFY(inputChannel->CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
Y_VERIFY(Checkpoints);
- const auto& checkpoint = channelData.GetCheckpoint();
- inputChannel->Pause(checkpoint);
- Checkpoints->RegisterCheckpoint(checkpoint, channelData.GetChannelId());
- }
-
+ const auto& checkpoint = channelData.GetCheckpoint();
+ inputChannel->Pause(checkpoint);
+ Checkpoints->RegisterCheckpoint(checkpoint, channelData.GetChannelId());
+ }
+
if (channelData.GetFinished()) {
channel->Finish();
}
@@ -572,22 +572,22 @@ public:
}
protected:
- bool ReadyToCheckpoint() const override {
- for (auto& [id, channelInfo] : InputChannelsMap) {
+ bool ReadyToCheckpoint() const override {
+ for (auto& [id, channelInfo] : InputChannelsMap) {
if (channelInfo.CheckpointingMode == NDqProto::CHECKPOINTING_MODE_DISABLED) {
continue;
}
- if (!channelInfo.IsPaused()) {
- return false;
- }
- if (!channelInfo.Channel->Empty()) {
- return false;
- }
- }
- return true;
- }
-
+ if (!channelInfo.IsPaused()) {
+ return false;
+ }
+ if (!channelInfo.Channel->Empty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
CA_LOG_D("Save state");
NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();
@@ -602,8 +602,8 @@ protected:
source.SourceActor->SaveState(checkpoint, sourceState);
sourceState.SetInputIndex(inputIndex);
}
- }
-
+ }
+
void CommitState(const NDqProto::TCheckpoint& checkpoint) override {
CA_LOG_D("Commit state");
for (auto& [inputIndex, source] : SourcesMap) {
@@ -614,20 +614,20 @@ protected:
void InjectBarrierToOutputs(const NDqProto::TCheckpoint& checkpoint) override {
Y_VERIFY(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
- for (const auto& [id, channelInfo] : OutputChannelsMap) {
+ for (const auto& [id, channelInfo] : OutputChannelsMap) {
channelInfo.Channel->Push(NDqProto::TCheckpoint(checkpoint));
- }
+ }
for (const auto& [outputIndex, sink] : SinksMap) {
sink.Sink->Push(NDqProto::TCheckpoint(checkpoint));
}
- }
-
- void ResumeInputs() override {
- for (auto& [id, channelInfo] : InputChannelsMap) {
- channelInfo.Resume();
- }
- }
-
+ }
+
+ void ResumeInputs() override {
+ for (auto& [id, channelInfo] : InputChannelsMap) {
+ channelInfo.Resume();
+ }
+ }
+
void LoadState(const NDqProto::TComputeActorState& state) override {
CA_LOG_D("Load state");
auto guard = BindAllocator();
@@ -653,19 +653,19 @@ protected:
return;
}
ythrow yexception() << "Invalid state version " << version;
- }
-
- void Start() override {
- Running = true;
+ }
+
+ void Start() override {
+ Running = true;
State = NDqProto::COMPUTE_STATE_EXECUTING;
- ContinueExecute();
- }
-
- void Stop() override {
- Running = false;
+ ContinueExecute();
+ }
+
+ void Stop() override {
+ Running = false;
State = NDqProto::COMPUTE_STATE_UNKNOWN;
- }
-
+ }
+
protected:
struct TInputChannelInfo {
ui64 ChannelId;
@@ -674,28 +674,28 @@ protected:
std::optional<NDqProto::TCheckpoint> PendingCheckpoint;
const NDqProto::ECheckpointingMode CheckpointingMode;
ui64 FreeSpace = 0;
-
+
explicit TInputChannelInfo(ui64 channelId, NDqProto::ECheckpointingMode checkpointingMode)
: ChannelId(channelId)
, CheckpointingMode(checkpointingMode)
{
}
- bool IsPaused() const {
+ bool IsPaused() const {
return PendingCheckpoint.has_value();
- }
-
+ }
+
void Pause(const NDqProto::TCheckpoint& checkpoint) {
- YQL_ENSURE(!IsPaused());
+ YQL_ENSURE(!IsPaused());
YQL_ENSURE(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
- PendingCheckpoint = checkpoint;
+ PendingCheckpoint = checkpoint;
Channel->Pause();
- }
-
- void Resume() {
+ }
+
+ void Resume() {
PendingCheckpoint.reset();
Channel->Resume();
- }
+ }
};
struct TSourceInfo {
@@ -785,9 +785,9 @@ protected:
protected:
void HandleExecuteBase(TEvDqCompute::TEvResumeExecution::TPtr&) {
ResumeEventScheduled = false;
- if (Running) {
+ if (Running) {
DoExecute();
- }
+ }
}
void HandleExecuteBase(TEvDqCompute::TEvChannelsInfo::TPtr& ev) {
@@ -876,15 +876,15 @@ protected:
void HandleExecuteBase(TEvDqCompute::TEvRun::TPtr& ev) {
CA_LOG_D("Got TEvRun from actor " << ev->Sender);
- Start();
+ Start();
// Event from coordinator should be processed to confirm seq no.
TAutoPtr<NActors::IEventHandle> iev(ev.Release());
if (Checkpoints) {
Checkpoints->Receive(iev, NActors::TActivationContext::AsActorContext());
}
- }
-
+ }
+
void HandleExecuteBase(TEvDqCompute::TEvStateRequest::TPtr& ev) {
CA_LOG_D("Got TEvStateRequest from actor " << ev->Sender << " TaskId: " << Task.GetId() << " PingCookie: " << ev->Cookie);
auto evState = MakeHolder<TEvDqCompute::TEvState>();
@@ -896,15 +896,15 @@ protected:
}
void HandleExecuteBase(TEvDqCompute::TEvNewCheckpointCoordinator::TPtr& ev) {
- if (!Checkpoints) {
+ if (!Checkpoints) {
Checkpoints = new TDqComputeActorCheckpoints(TxId, Task, this);
Checkpoints->Init(this->SelfId(), this->RegisterWithSameMailbox(Checkpoints));
Channels->SetCheckpointsSupport();
- }
- TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), ev->Sender, ev->Release().Release());
+ }
+ TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), ev->Sender, ev->Release().Release());
Checkpoints->Receive(handle, NActors::TActivationContext::AsActorContext());
- }
-
+ }
+
void HandleExecuteBase(TEvDq::TEvAbortExecution::TPtr& ev) {
TString message = ev->Get()->Record.GetMessage();
CA_LOG_E("Handle abort execution event from: " << ev->Sender
@@ -992,10 +992,10 @@ private:
NDqProto::TData data;
NDqProto::TCheckpoint checkpoint;
-
- bool hasData = channel->Pop(data, bytes);
- bool hasCheckpoint = channel->Pop(checkpoint);
- if (!hasData && !hasCheckpoint) {
+
+ bool hasData = channel->Pop(data, bytes);
+ bool hasCheckpoint = channel->Pop(checkpoint);
+ if (!hasData && !hasCheckpoint) {
if (!channel->IsFinished()) {
CA_LOG_D("output channelId: " << channel->GetChannelId() << ", nothing to send and is not finished");
return 0; // channel is empty and not finished yet
@@ -1006,19 +1006,19 @@ private:
const bool becameFinished = !wasFinished && outputChannel.Finished;
ui32 dataSize = data.GetRaw().size();
- ui32 checkpointSize = checkpoint.ByteSize();
+ ui32 checkpointSize = checkpoint.ByteSize();
NDqProto::TChannelData channelData;
channelData.SetChannelId(channel->GetChannelId());
channelData.SetFinished(outputChannel.Finished);
- if (hasData) {
- channelData.MutableData()->Swap(&data);
- }
- if (hasCheckpoint) {
- channelData.MutableCheckpoint()->Swap(&checkpoint);
- CA_LOG_I("Resume inputs");
- ResumeInputs();
- }
+ if (hasData) {
+ channelData.MutableData()->Swap(&data);
+ }
+ if (hasCheckpoint) {
+ channelData.MutableCheckpoint()->Swap(&checkpoint);
+ CA_LOG_I("Resume inputs");
+ ResumeInputs();
+ }
if (hasData || hasCheckpoint || becameFinished) {
Channels->SendChannelData(std::move(channelData));
@@ -1467,7 +1467,7 @@ protected:
const NDqProto::ECheckpointingMode CheckpointingMode;
TIntrusivePtr<IDqTaskRunner> TaskRunner;
TDqComputeActorChannels* Channels = nullptr;
- TDqComputeActorCheckpoints* Checkpoints = nullptr;
+ TDqComputeActorCheckpoints* Checkpoints = nullptr;
THashMap<ui64, TInputChannelInfo> InputChannelsMap; // Channel id -> Channel info
THashMap<ui64, TSourceInfo> SourcesMap; // Input index -> Source info
THashMap<ui64, TOutputChannelInfo> OutputChannelsMap; // Channel id -> Channel info
@@ -1498,7 +1498,7 @@ protected:
TProcessOutputsState ProcessOutputsState;
private:
- bool Running = true;
+ bool Running = true;
TInstant LastSendStatsTime;
};
diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make
index c4b20bd502..ae47a3e5b0 100644
--- a/ydb/library/yql/dq/actors/compute/ya.make
+++ b/ydb/library/yql/dq/actors/compute/ya.make
@@ -9,7 +9,7 @@ SRCS(
dq_compute_actor.cpp
dq_async_compute_actor.cpp
dq_compute_actor_channels.cpp
- dq_compute_actor_checkpoints.cpp
+ dq_compute_actor_checkpoints.cpp
dq_compute_actor_io_actors_factory.cpp
dq_compute_actor_stats.cpp
dq_compute_issues_buffer.cpp
diff --git a/ydb/library/yql/dq/actors/dq_events_ids.h b/ydb/library/yql/dq/actors/dq_events_ids.h
index 698377a393..800c1818c3 100644
--- a/ydb/library/yql/dq/actors/dq_events_ids.h
+++ b/ydb/library/yql/dq/actors/dq_events_ids.h
@@ -33,18 +33,18 @@ struct TDqComputeEvents {
ReservedKqp_EvScanInitActor,
ReservedKqp_EvRemoteScanData,
ReservedKqp_EvRemoteScanDataAck,
-
+
EvRun = EventSpaceBegin(TDqEvents::ES_DQ_COMPUTE),
EvNewCheckpointCoordinator,
- EvInjectCheckpoint,
- EvSaveTaskState,
- EvSaveTaskStateResult,
- EvCommitState,
- EvStateCommitted,
- EvRestoreFromCheckpoint,
- EvRestoreFromCheckpointResult,
- EvGetTaskState,
+ EvInjectCheckpoint,
+ EvSaveTaskState,
+ EvSaveTaskStateResult,
+ EvCommitState,
+ EvStateCommitted,
+ EvRestoreFromCheckpoint,
+ EvRestoreFromCheckpointResult,
+ EvGetTaskState,
EvGetTaskStateResult,
EvStateRequest,
EvNewCheckpointCoordinatorAck,
diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto
index 0e38ab3bbc..6d70a971ae 100644
--- a/ydb/library/yql/dq/actors/protos/dq_events.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_events.proto
@@ -11,7 +11,7 @@ import "ydb/public/api/protos/ydb_issue_message.proto";
message TCheckpoint {
optional uint64 Id = 1;
- optional uint64 Generation = 2;
+ optional uint64 Generation = 2;
};
/*
@@ -19,7 +19,7 @@ message TCheckpoint {
1) Data
2) Checkpoint
3) Finished
-*/
+*/
message TChannelData {
optional uint64 ChannelId = 1;
optional NYql.NDqProto.TData Data = 2;
@@ -27,10 +27,10 @@ message TChannelData {
optional TCheckpoint Checkpoint = 4;
};
-message TEvRun {
+message TEvRun {
optional TMessageTransportMeta TransportMeta = 100;
-}
-
+}
+
message TEvChannelsInfo {
repeated NYql.NDqProto.TChannel Update = 1;
};
@@ -102,76 +102,76 @@ message TComputeRuntimeSettings {
optional uint32 TasksOnNodeCount = 5; // approx
optional TRlPath RlPath = 7;
}
-
-message TEvNewCheckpointCoordinator {
- optional uint64 Generation = 1;
- optional string GraphId = 2;
+
+message TEvNewCheckpointCoordinator {
+ optional uint64 Generation = 1;
+ optional string GraphId = 2;
optional TMessageTransportMeta TransportMeta = 100;
-}
-
+}
+
message TEvNewCheckpointCoordinatorAck {
optional TMessageTransportMeta TransportMeta = 100;
}
-message TEvInjectCheckpoint {
- optional TCheckpoint Checkpoint = 1;
- optional uint64 Generation = 2;
+message TEvInjectCheckpoint {
+ optional TCheckpoint Checkpoint = 1;
+ optional uint64 Generation = 2;
optional TMessageTransportMeta TransportMeta = 100;
-}
-
-message TEvSaveTaskStateResult {
- enum EStatus {
- UNSPECIFIED = 0;
- OK = 1;
- STORAGE_ERROR = 2;
- INTERNAL_ERROR = 3;
- STATE_TOO_BIG = 4;
- }
- optional TCheckpoint Checkpoint = 1;
- optional uint64 TaskId = 2;
- optional EStatus Status = 3;
- optional uint64 StateSizeBytes = 4;
+}
+
+message TEvSaveTaskStateResult {
+ enum EStatus {
+ UNSPECIFIED = 0;
+ OK = 1;
+ STORAGE_ERROR = 2;
+ INTERNAL_ERROR = 3;
+ STATE_TOO_BIG = 4;
+ }
+ optional TCheckpoint Checkpoint = 1;
+ optional uint64 TaskId = 2;
+ optional EStatus Status = 3;
+ optional uint64 StateSizeBytes = 4;
optional TMessageTransportMeta TransportMeta = 100;
-}
-
-message TEvCommitState {
- optional TCheckpoint Checkpoint = 1;
- optional uint64 Generation = 2;
+}
+
+message TEvCommitState {
+ optional TCheckpoint Checkpoint = 1;
+ optional uint64 Generation = 2;
optional TMessageTransportMeta TransportMeta = 100;
-}
-
-message TEvStateCommitted {
- optional TCheckpoint Checkpoint = 1;
- optional uint64 TaskId = 2;
+}
+
+message TEvStateCommitted {
+ optional TCheckpoint Checkpoint = 1;
+ optional uint64 TaskId = 2;
optional TMessageTransportMeta TransportMeta = 100;
-}
-
-message TEvRestoreFromCheckpoint {
- optional TCheckpoint Checkpoint = 1;
- optional uint64 Generation = 2;
+}
+
+message TEvRestoreFromCheckpoint {
+ optional TCheckpoint Checkpoint = 1;
+ optional uint64 Generation = 2;
optional NDqStateLoadPlan.TTaskPlan StateLoadPlan = 3;
optional TMessageTransportMeta TransportMeta = 100;
-}
-
-message TEvRestoreFromCheckpointResult {
- enum ERestoreStatus {
- UNSPECIFIED = 0;
- OK = 1;
- STORAGE_ERROR = 2;
- INTERNAL_ERROR = 3;
- }
- optional TCheckpoint Checkpoint = 1;
- optional uint64 TaskId = 2;
- optional ERestoreStatus Status = 3;
+}
+
+message TEvRestoreFromCheckpointResult {
+ enum ERestoreStatus {
+ UNSPECIFIED = 0;
+ OK = 1;
+ STORAGE_ERROR = 2;
+ INTERNAL_ERROR = 3;
+ }
+ optional TCheckpoint Checkpoint = 1;
+ optional uint64 TaskId = 2;
+ optional ERestoreStatus Status = 3;
optional TMessageTransportMeta TransportMeta = 100;
-}
+}
message TMessageTransportMeta {
optional uint64 SeqNo = 1; // SeqNo of message
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index 211d4ac39a..bfa0238d33 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -546,17 +546,17 @@ public:
}
void Push(NDqProto::TCheckpoint&& checkpoint) override {
- YQL_ENSURE(!Checkpoint);
- Checkpoint.ConstructInPlace(std::move(checkpoint));
- }
-
+ YQL_ENSURE(!Checkpoint);
+ Checkpoint.ConstructInPlace(std::move(checkpoint));
+ }
+
[[nodiscard]]
bool Pop(NDqProto::TData& data, ui64 bytes) override {
LOG("Pop request, rows in memory: " << DataHead.size() << "/" << DataTail.size()
<< ", spilled rows: " << SpilledRows << ", spilled blobs: " << SpilledBlobs.size()
<< ", finished: " << Finished << ", requested: " << bytes << ", maxChunkBytes: " << MaxChunkBytes);
- if (!HasData()) {
+ if (!HasData()) {
if (Finished) {
data.SetTransportVersion(DataSerializer.GetTransportVersion());
data.SetRows(0);
@@ -670,15 +670,15 @@ public:
[[nodiscard]]
bool Pop(NDqProto::TCheckpoint& checkpoint) override {
- if (!HasData() && Checkpoint) {
+ if (!HasData() && Checkpoint) {
checkpoint = std::move(*Checkpoint);
Checkpoint = Nothing();
- return true;
- }
- return false;
- }
-
- [[nodiscard]]
+ return true;
+ }
+ return false;
+ }
+
+ [[nodiscard]]
bool PopAll(NDqProto::TData& data) override {
YQL_ENSURE(!SpilledRows);
YQL_ENSURE(DataTail.empty());
@@ -742,10 +742,10 @@ public:
}
}
- bool HasData() const override {
- return !DataHead.empty() || SpilledRows != 0 || !DataTail.empty();
- }
-
+ bool HasData() const override {
+ return !DataHead.empty() || SpilledRows != 0 || !DataTail.empty();
+ }
+
bool IsFinished() const override {
return Finished && !HasData();
}
@@ -822,7 +822,7 @@ private:
ui64 SpilledRows = 0;
bool Finished = false;
-
+
TMaybe<NDqProto::TCheckpoint> Checkpoint;
};
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h
index f7a4887d2d..57f794ae8d 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.h
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.h
@@ -40,7 +40,7 @@ public:
[[nodiscard]]
virtual bool Pop(NDqProto::TData& data, ui64 bytes) = 0;
// Pop chechpoint. Checkpoints may be taken from channel even after it is finished.
- [[nodiscard]]
+ [[nodiscard]]
virtual bool Pop(NDqProto::TCheckpoint& checkpoint) = 0;
// Only for data-queries
// TODO: remove this method and create independent Data- and Stream-query implementations.
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 10900352c3..7192c6fb38 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -620,14 +620,14 @@ public:
return Stats.get();
}
- TString Save() const override {
- return ProgramParsed.CompGraph->SaveGraphState();
- }
-
- void Load(TStringBuf in) override {
- ProgramParsed.CompGraph->LoadGraphState(in);
- }
-
+ TString Save() const override {
+ return ProgramParsed.CompGraph->SaveGraphState();
+ }
+
+ void Load(TStringBuf in) override {
+ ProgramParsed.CompGraph->LoadGraphState(in);
+ }
+
private:
NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv() {
return Context.TypeEnv ? *Context.TypeEnv : *SelfTypeEnv;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index a0cc816e91..56c902c858 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -176,10 +176,10 @@ public:
virtual void UpdateStats() = 0;
virtual const TDqTaskRunnerStats* GetStats() const = 0;
-
- [[nodiscard]]
- virtual TString Save() const = 0;
- virtual void Load(TStringBuf in) = 0;
+
+ [[nodiscard]]
+ virtual TString Save() const = 0;
+ virtual void Load(TStringBuf in) = 0;
};
TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings,
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index cdcbd32ef5..00b8bd3540 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -616,10 +616,10 @@ public:
}
}
- bool Empty() const override {
- ythrow yexception() << "unimplemented";
- }
-
+ bool Empty() const override {
+ ythrow yexception() << "unimplemented";
+ }
+
void Push(NDqProto::TData&& data) override {
try {
return Delegate->Push(std::move(data));
@@ -957,10 +957,10 @@ public:
}
void Push(NDqProto::TCheckpoint&& checkpoint) override {
- Y_UNUSED(checkpoint);
- ythrow yexception() << "unimplemented";
- }
-
+ Y_UNUSED(checkpoint);
+ ythrow yexception() << "unimplemented";
+ }
+
void Finish() override {
try {
NDqProto::TCommandHeader header;
@@ -976,11 +976,11 @@ public:
// |>
// <| consumer methods
-
- bool HasData() const override {
- ythrow yexception() << "unimplemented";
- }
-
+
+ bool HasData() const override {
+ ythrow yexception() << "unimplemented";
+ }
+
bool IsFinished() const override {
try {
return Delegate->IsFinished();
@@ -998,11 +998,11 @@ public:
TaskRunner->RaiseException();
}
}
-
+
bool Pop(NDqProto::TCheckpoint&) override {
return false;
- }
-
+ }
+
// Only for data-queries
// TODO: remove this method and create independent Data- and Stream-query implementations.
// Stream-query implementation should be without PopAll method.
@@ -1620,15 +1620,15 @@ public:
}
}
- TString Save() const override {
- ythrow yexception() << "unimplemented";
- }
-
- void Load(TStringBuf in) override {
- Y_UNUSED(in);
- ythrow yexception() << "unimplemented";
- }
+ TString Save() const override {
+ ythrow yexception() << "unimplemented";
+ }
+ void Load(TStringBuf in) override {
+ Y_UNUSED(in);
+ ythrow yexception() << "unimplemented";
+ }
+
private:
TIntrusivePtr<TTaskRunner> Delegate;
NDqProto::TDqTask Task;