diff options
author | Dmitry Kardymon <[email protected]> | 2025-07-10 15:59:46 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-07-10 15:59:46 +0300 |
commit | f43a538dcbe3e8364ff021134c0f1177814aa57f (patch) | |
tree | fa6a77bd6e432832f42b572709604c1fa67c728b | |
parent | 9f6d559183eb9fb02d78c58ea6bb61179c9a025e (diff) |
YQ-2643 Disable coordinator if no ingress tasks (#1113)oidc-1.2.1
3 files changed, 47 insertions, 8 deletions
diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp index d7ff17db741..f080732147a 100644 --- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp @@ -80,6 +80,14 @@ void TCheckpointCoordinator::Handle(NFq::TEvCheckpointCoordinator::TEvReadyState AllActorsSet.insert(actorId); } + CC_LOG_D("AllActors count: " << AllActors.size() << ", ActorsToTrigger count: " << ActorsToTrigger.size() << ", ActorsToNotify count: " << ActorsToNotify.size() << ", ActorsToWaitFor count: " << ActorsToWaitFor.size()); + + if (ActorsToTrigger.empty()) { + CC_LOG_D("No ingress tasks, coordinator was disabled"); + StartAllTasks(); + return; + } + PendingInit = std::make_unique<TPendingInitCoordinator>(AllActors.size()); CC_LOG_D("Send TEvRegisterCoordinatorRequest"); @@ -408,8 +416,12 @@ void TCheckpointCoordinator::InjectCheckpoint(const TCheckpointId& checkpointId, transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvInjectCheckpoint(checkpointId.SeqNo, checkpointId.CoordinatorGeneration, type)); } + StartAllTasks(); +} + +void TCheckpointCoordinator::StartAllTasks() { if (!GraphIsRunning) { - CC_LOG_I("[" << checkpointId << "] Send TEvRun to all actors"); + CC_LOG_I("Send TEvRun to all actors"); for (const auto& [actor, transport] : AllActors) { transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvRun()); } diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h index 263903467b2..2c48f08e1a5 100644 --- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h +++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h @@ -91,6 +91,7 @@ private: void PassAway() override; void RestoreFromOwnCheckpoint(const TCheckpointMetadata& checkpoint); void TryToRestoreOffsetsFromForeignCheckpoint(const TCheckpointMetadata& checkpoint); + void StartAllTasks(); void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message, const NYql::TIssues& subIssues); void OnInternalError(const TString& message, const NYql::TIssues& subIssues = {}); diff --git a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp index 8872e7d2080..fd2df55e6de 100644 --- a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp +++ b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp @@ -4,6 +4,7 @@ #include <ydb/core/testlib/basics/helpers.h> #include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/testing/unittest/gtest.h> #include <ydb/library/actors/core/executor_pool_basic.h> #include <ydb/library/actors/core/scheduler_basic.h> @@ -23,7 +24,7 @@ enum ETestGraphFlags : ui64 { SourceWithChannelInOneTask = 2, }; -NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) { +NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType) { NYql::NDqProto::TReadyState result; @@ -33,7 +34,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) { ingressOutput->AddChannels(); if (flags & ETestGraphFlags::InputWithSource) { auto* source = ingress->AddInputs()->MutableSource(); - source->SetType("PqSource"); + source->SetType(sourceType); } auto* map = result.AddTask(); @@ -44,7 +45,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) { mapOutput->AddChannels(); if (flags & ETestGraphFlags::SourceWithChannelInOneTask) { auto* source = map->AddInputs()->MutableSource(); - source->SetType("PqSource"); + source->SetType(sourceType); } auto* egress = result.AddTask(); @@ -78,9 +79,9 @@ struct TTestBootstrap : public TTestActorRuntime { ::NMonitoring::TDynamicCounterPtr Counters = new ::NMonitoring::TDynamicCounters(); - explicit TTestBootstrap(ui64 graphFlags = 0, ui64 snaphotRotationPeriod = 0) + explicit TTestBootstrap(ui64 graphFlags, ui64 snaphotRotationPeriod, const TString& sourceType) : TTestActorRuntime(true) - , GraphState(BuildTestGraph(graphFlags)) + , GraphState(BuildTestGraph(graphFlags, sourceType)) , CoordinatorId("my-graph-id", 42) , CheckpointId1(CoordinatorId.Generation, 1) , CheckpointId2(CoordinatorId.Generation, 2) @@ -219,6 +220,12 @@ struct TTestBootstrap : public TTestActorRuntime { return google::protobuf::util::MessageDifferencer::Equals(lhs.Record, rhs.Record); } + bool IsEqual( + const NYql::NDq::TEvDqCompute::TEvRun& lhs, + const NYql::NDq::TEvDqCompute::TEvRun& rhs) { + return google::protobuf::util::MessageDifferencer::Equals(lhs.Record, rhs.Record); + } + template <typename TEvent> void ExpectEvent(NActors::TActorId actorId, const TEvent& expectedEventValue) { auto eventHolder = GrabEdgeEvent<TEvent>(actorId, TDuration::Seconds(10)); @@ -330,8 +337,8 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { class CheckpointsTestHelper : public TTestBootstrap { public: - CheckpointsTestHelper(ui64 graphFlags, ui64 snaphotRotationPeriod = 0) - : TTestBootstrap(graphFlags, snaphotRotationPeriod) { + CheckpointsTestHelper(ui64 graphFlags, ui64 snaphotRotationPeriod = 0, const TString& sourceType = "PqSource") + : TTestBootstrap(graphFlags, snaphotRotationPeriod, sourceType) { } void RegisterCoordinator() { @@ -445,12 +452,19 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { void ScheduleCheckpointing() { MockScheduleCheckpointing(); } + + void ExpectRun() { + ExpectEvent(IngressActor, NYql::NDq::TEvDqCompute::TEvRun()); + ExpectEvent(MapActor, NYql::NDq::TEvDqCompute::TEvRun()); + ExpectEvent(EgressActor, NYql::NDq::TEvDqCompute::TEvRun()); + } }; Y_UNIT_TEST(ShouldTriggerCheckpointWithSource) { CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0); test.RegisterCoordinator(); test.InjectCheckpoint(test.CheckpointId1); + test.ExpectRun(); test.AllSavedAndCommited(test.CheckpointId1); test.MockRunGraph(); } @@ -459,6 +473,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask, 0); test.RegisterCoordinator(); test.InjectCheckpoint(test.CheckpointId1); + test.ExpectRun(); test.AllSavedAndCommited(test.CheckpointId1); test.MockRunGraph(); } @@ -467,6 +482,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0); test.RegisterCoordinator(); test.InjectCheckpoint(test.CheckpointId1); + test.ExpectRun(); test.AllSavedAndCommited(test.CheckpointId1); test.MockRunGraph(); @@ -479,6 +495,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 2); test.RegisterCoordinator(); test.InjectCheckpoint(test.CheckpointId1); + test.ExpectRun(); test.AllSavedAndCommited(test.CheckpointId1); test.MockRunGraph(); @@ -499,11 +516,20 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0); test.RegisterCoordinator(); test.InjectCheckpoint(test.CheckpointId1); + test.ExpectRun(); test.SaveFailed(test.CheckpointId1); test.ScheduleCheckpointing(); test.InjectCheckpoint(test.CheckpointId2, test.GraphDescId, NYql::NDqProto::CHECKPOINT_TYPE_SNAPSHOT); } + + Y_UNIT_TEST(ShouldDoNothingIfNoIngressTasks) { + CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0, "S3Source"); + test.ExpectRun(); + ASSERT_THROW( + test.GrabEdgeEvent<TEvCheckpointStorage::TEvRegisterCoordinatorRequest>(test.StorageProxy, TDuration::Seconds(10)), + NActors::TEmptyEventQueueException); + } } } // namespace NFq |